1、ZBusconfig.java, zbus的启动、生产、回调处理消息的方法。
package com.accenture.icc.zbus.config;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.zbus.broker.Broker;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.mq.Consumer.ConsumerHandler;
import org.zbus.mq.MqAdmin;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
import com.accenture.icc.pojo.AnalogInputData;
import com.accenture.icc.pojo.DataUnit;
import com.accenture.icc.pojo.TripleDataWrapper;
@Configuration
@PropertySource("classpath:data.properties")
public class ZbusConfig {
private static final Logger logger = LoggerFactory.getLogger(ZbusConfig.class);
@Value("${zbus.mq.sub}")
private String subMq;
@Value("${zbus.mq.recv}")
private String recvMq;
@Value("${zbus.mq.alarm}")
private String alarmMq;
@Value("${zbus.mq.subCommonData}")
private String subIndexDataMq;
@Value("${zbus.mq.recvCommonData}")
private String indexDataMq;
@Value("${csv.power.maxsize}")
private int powerMaxListSize;
@Value("${csv.power.period}")
private int powerPeriod;
@Value("${csv.consumption.maxsize}")
private int consumptionMaxListSize;
@Value("${csv.consumption.period}")
private int consumptionPeriod;
@Autowired
private TripleDataWrapper powerWrapper;
@Autowired
private TripleDataWrapper consumptionWrapper;
@Autowired
private TripleDataWrapper transformerWrapper;
@Autowired
private TripleDataWrapper powerFactorWrapper;
@Autowired
private TripleDataWrapper unbalanceWrapper;
@Autowired
private TripleDataWrapper stationParamWrapper;
/**
* 回调函数
* @param messaging
* @return
*/
@Bean
public ConsumerHandler consumerHandler(SimpMessageSendingOperations messaging) {
ConsumerHandler consumerHandler = new ConsumerHandler() {
@Override
public void handle(Message msg, Consumer consumer) throws IOException {
logger.info("RECEIVING MESSAGE: {}", msg.getBodyString());
String[] fields = msg.getBodyString().split(",");
if(fields.length < 5){
return;
}
String tag = fields[0];
int tableId = 0;
int recordId = 0;
int fieldId = 0;
double value = 0;
String timeStamp = "";
try{
tableId = Integer.parseInt(fields[1]);
recordId = Integer.parseInt(fields[2]);
fieldId = Integer.parseInt(fields[3]);
value = Double.parseDouble(fields[4]);
if(value < 0.01 || value == 128) {
value = 0;
}
timeStamp = fields[6];
} catch (NumberFormatException e) {
logger.info("message format error.");
return;
}
AnalogInputData analogInputData = new AnalogInputData(tag, tableId, recordId, fieldId, value, timeStamp);
if(powerWrapper.containsRecord(recordId)){
List<AnalogInputData> datalist = powerWrapper.getDataListByRecordid(recordId);
if(datalist.size() == powerMaxListSize) {
datalist.remove(0);
}
datalist.add(analogInputData);
}
if(consumptionWrapper.containsRecord(recordId)){
List<AnalogInputData> datalist = consumptionWrapper.getDataListByRecordid(recordId);
if(datalist.size() == consumptionMaxListSize) {
datalist.remove(0);
}
datalist.add(analogInputData);
}
if(transformerWrapper.containsRecord(recordId)) {
List<AnalogInputData> datalist = transformerWrapper.getDataListByRecordid(recordId);
if(datalist.size() == powerMaxListSize) {
datalist.remove(0);
}
datalist.add(analogInputData);
}
if(powerFactorWrapper.containsRecord(recordId)) {
List<AnalogInputData> datalist = powerFactorWrapper.getDataListByRecordid(recordId);
if(datalist.size() == powerMaxListSize) {
datalist.remove(0);
}
datalist.add(analogInputData);
}
if(unbalanceWrapper.containsRecord(recordId)) {
List<AnalogInputData> datalist = unbalanceWrapper.getDataListByRecordid(recordId);
if(datalist.size() == powerMaxListSize) {
datalist.remove(0);
}
datalist.add(analogInputData);
}
broadcastAnalog(recordId, messaging);
}
};
return consumerHandler;
}
private void broadcastAnalog(int recordId, SimpMessageSendingOperations messaging) {
Map<String, Object> resMap = new HashMap<>();
resMap.put("status", 1000);
int groupid = -1;
int factoryid = -1;
List<DataUnit> dataUnits = null;
String destination = "";
if(powerWrapper.containsRecord(recordId)){
groupid = powerWrapper.getGroupIdByRecord(recordId);
dataUnits = powerWrapper.getDataUnitsByGroupid(groupid);
factoryid = powerWrapper.getFactoryIdByRecord(recordId);
destination = "/topic/analogs/"+factoryid+"/1";
resMap.put("dataUnits", dataUnits);
resMap.put("groupid", groupid);
messaging.convertAndSend(destination, resMap);
}
if(consumptionWrapper.containsRecord(recordId)){
groupid = consumptionWrapper.getGroupIdByRecord(recordId);
dataUnits = consumptionWrapper.getDataUnitsByGroupid(groupid);
factoryid = consumptionWrapper.getFactoryIdByRecord(recordId);
destination = "/topic/consumption/"+factoryid+"/1";
resMap.put("dataUnits", dataUnits);
resMap.put("groupid", groupid);
messaging.convertAndSend(destination, resMap);
}
if(transformerWrapper.containsRecord(recordId)) {
groupid = transformerWrapper.getGroupIdByRecord(recordId);
dataUnits = transformerWrapper.getDataUnitsByGroupid(groupid);
factoryid = transformerWrapper.getFactoryIdByRecord(recordId);
destination = "/topic/analogs/"+factoryid+"/2";
resMap.put("dataUnits", dataUnits);
resMap.put("groupid", groupid);
messaging.convertAndSend(destination, resMap);
}
if(powerFactorWrapper.containsRecord(recordId)) {
groupid = powerFactorWrapper.getGroupIdByRecord(recordId);
dataUnits = powerFactorWrapper.getDataUnitsByGroupid(groupid);
factoryid = powerFactorWrapper.getFactoryIdByRecord(recordId);
destination = "/topic/analogs/"+factoryid+"/3";
resMap.put("dataUnits", dataUnits);
resMap.put("groupid", groupid);
messaging.convertAndSend(destination, resMap);
}
if(unbalanceWrapper.containsRecord(recordId)) {
groupid = unbalanceWrapper.getGroupIdByRecord(recordId);
dataUnits = unbalanceWrapper.getDataUnitsByGroupid(groupid);
factoryid = unbalanceWrapper.getFactoryIdByRecord(recordId);
destination = "/topic/analogs/"+factoryid+"/4";
resMap.put("dataUnits", dataUnits);
resMap.put("groupid", groupid);
messaging.convertAndSend(destination, resMap);
}
}
/**
* @return
* @throws IOException
*/
@Bean(destroyMethod="close")
public Broker broker(@Value("${zbus.address}") String zbusAddress) throws IOException {
Broker broker = new ZbusBroker(zbusAddress);
return broker;
}
/**
* @param consumerHandler
* @return
* @throws IOException
* @throws InterruptedException
*/
@Bean(initMethod="start")
public Consumer zbusConsumer(Broker broker, ConsumerHandler consumerHandler) throws IOException, InterruptedException {
MqAdmin mqAdmin = new MqAdmin(broker, recvMq);
mqAdmin.removeMQ();
Consumer consumer = new Consumer(broker, recvMq);
consumer.onMessage(consumerHandler);
return consumer;
}
@Bean(initMethod="start")
public Consumer zbusConsumerAlarm(Broker broker, ConsumerHandler consumerHandlerAlarm) throws IOException, InterruptedException {
MqAdmin mqAdminAlarm = new MqAdmin(broker, alarmMq);
mqAdminAlarm.removeMQ();
Consumer consumerAlarm = new Consumer(broker, alarmMq);
consumerAlarm.onMessage(consumerHandlerAlarm);
return consumerAlarm;
}
@Bean(initMethod="start")
public Consumer zbusConsumerIndexData(Broker broker, ConsumerHandler consumerHandlerIndexData) throws IOException, InterruptedException{
MqAdmin mqAdmin2 = new MqAdmin(broker, indexDataMq);
mqAdmin2.removeMQ();
Consumer consumerIndexData = new Consumer(broker, indexDataMq);
consumerIndexData.onMessage(consumerHandlerIndexData);
return consumerIndexData;
}
/**
* @return
* @throws IOException
* @throws InterruptedException
*/
@Bean
public Producer zbusProducer(Broker broker) throws IOException, InterruptedException {
List<Integer> powerRecords = powerWrapper.getRecordIds();
List<Integer> consumptionRecords = consumptionWrapper.getRecordIds();
List<Integer> transformerRecords = transformerWrapper.getRecordIds();
List<Integer> stationParamRecords = stationParamWrapper.getRecordIds();
Producer producer = new Producer(broker, subMq);
producer.createMQ();
subscribeData(producer, powerRecords, powerPeriod);
subscribeData(producer, consumptionRecords, consumptionPeriod);
subscribeData(producer, transformerRecords, powerPeriod);
producer.setMq(subIndexDataMq);
producer.createMQ();
subscribeData(producer, stationParamRecords, powerPeriod);
return producer;
}
/**
* @param producer
* @param records
* @param period
* @throws IOException
* @throws InterruptedException
*/
private void subscribeData(Producer producer, List<Integer> records, int period)
throws IOException, InterruptedException {
for(Integer recordid : records) {
long periodMillis = period * 60000;
String msgbody = String.format("sub:%d,101,%d,7,%d", recordid, recordid, periodMillis);
Message message = new Message();
message.setBody(msgbody);
message = producer.sendSync(message);
}
}
@Bean
public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() {
return new PropertySourcesPlaceholderConfigurer();
}
}
2、WebSocketStompConfig.java, websocket的配置
package com.accenture.icc.zbus.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
/**
* STOMP 配置
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer{
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/message").setAllowedOrigins("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
registry.setApplicationDestinationPrefixes("/app");
}
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(5242880); //5MB
container.setMaxBinaryMessageBufferSize(5242880);
return container;
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
super.configureWebSocketTransport(registration);
registration.setMessageSizeLimit(5242880);//5mb, default 64kb
registration.setSendBufferSizeLimit(5242880);
}
}
<script type="text/javascript" src="<%=basePath%>resources/js/sockjs.min.js"></script>
<script type="text/javascript" src="<%=basePath%>resources/js/stomp.min.js"></script>
function initUnbalanceDataStomp(){
var sockurl = "<%=basePath %>views/message";
var socket = new SockJS(sockurl);
var stompClient = Stomp.over(socket);
// callback function to be called when stomp client is connected to server
var connectCallback = function() {
console.log('connected!');
// 订阅analog数据
stompClient.subscribe('/topic/analogs/'+'${sessionScope.station.rtuId}'+'/4', function(data){
var body = JSON.parse(data.body);
if(body.status==1000) {
console.log("do some thing");
}
});
};
// callback function to be called when stomp client could not connect to server
var errorCallback = function(error) {
// display the error's message header:
console.log(error);
};
stompClient.connect("guest", "guest", connectCallback, errorCallback);
}