package com.ntg.frameWork.act.AppFrame;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import com.ntg.frameWork.act.Event;
import com.ntg.frameWork.act.custprotocal.CustomProtocolParser;
import com.ntg.frameWork.util.Functions;
import com.sun.grizzly.BaseSelectionKeyHandler;
import com.sun.grizzly.Context;
import com.sun.grizzly.Controller;
import com.sun.grizzly.ControllerStateListenerAdapter;
import com.sun.grizzly.DefaultProtocolChain;
import com.sun.grizzly.ProtocolChain;
import com.sun.grizzly.ProtocolChainInstanceHandler;
import com.sun.grizzly.SSLConfig;
import com.sun.grizzly.SSLConnectorHandler;
import com.sun.grizzly.SSLSelectorHandler;
import com.sun.grizzly.TCPSelectorHandler;
import com.sun.grizzly.async.AsyncWriteCallbackHandler;
import com.sun.grizzly.filter.ParserProtocolFilter;
import com.sun.grizzly.util.DefaultThreadPool;
/*
* 消息处理基类
* 继承此类都有消息队列处理功能
*/
public abstract class BaseServer implements Runnable{
//是否继续运行
private boolean bKeepingRunning;
//事件队列
private Vector<Event> events;
private int port=8078;
private Controller controller;
private BaseFilter serverDispatcher;
private Controller.Protocol protocol;
private SSLConfig sslConfig;
public BaseServer(){
bKeepingRunning = true;
events = new Vector<Event>();
//注册到全局消息中心
}
/*
* 处理消息
*/
public abstract void OnEvent(Object [] ev) throws InterruptedException ;
/*
* 处理执行异常
*/
public abstract void OnException(Exception e);
/*
* 处理退出
*/
public abstract void OnExit();
public boolean isBKeepingRunning() {
return bKeepingRunning;
}
public void setBKeepingRunning(boolean keepingRunning) {
bKeepingRunning = keepingRunning;
}
public void run(){
try{
while(bKeepingRunning)
dealEvent();
}catch(Exception e){
e.printStackTrace();
OnException(e);
}
OnExit();
}
/*
* 触发消息
*/
public synchronized void RaisEvent(Event et){
events.addElement(et);
notify();
}
private void dealEvent() throws InterruptedException{
Object [] ev = getCurrentEvents();
if(ev==null)return;
OnEvent(ev);
}
private Vector<Event> getEvents() {
return events;
}
/*
* 获取当前消息队列,如果队列为空,则阻塞
*/
private Object[] getCurrentEvents() throws InterruptedException {
Object [] ev = null;
if(getEvents().size()==0){
//只有队列空的时候才进行等待
synchronized(this){
wait();
ev=getEvents().toArray();
//移除的时候要按索引移除,防止移除过程中,有新的加入
for(int k=0;k<ev.length;k++)
getEvents().removeElement(ev[k]);
}
}else{
ev=getEvents().toArray();
for(int k=0;k<ev.length;k++)
getEvents().removeElement(ev[k]);
}
return ev;
}
/**
* Starts this server.
*/
public void startServer(int iPort) {
port = iPort;
//ReplyMessageFactory replyMessageFactory = new ReplyMessageFactory();
controller = new Controller();
DefaultThreadPool defp = new DefaultThreadPool();
defp.setInitialByteBufferSize(Functions.MAXREQLEN);
controller.setThreadPool(defp);
TCPSelectorHandler tcpSelectorHandler =
(protocol == Controller.Protocol.TLS) ? new SSLSelectorHandler() : new TCPSelectorHandler();
BaseSelectionKeyHandler keyHandler = new BaseSelectionKeyHandler();
tcpSelectorHandler.setSelectionKeyHandler(keyHandler);
tcpSelectorHandler.setPort(port);
controller.addSelectorHandler(tcpSelectorHandler);
final DefaultProtocolChain protocolChain = new DefaultProtocolChain();
// protocolChain.addFilter(CustomProtocolParser.createParserProtocolFilter(null, replyMessageFactory, sslConfig));
protocolChain.addFilter( getNewParser());
CustomProtocolParser parser = new CustomProtocolParser(sslConfig);
parser.setBytesArrivedListener(null);
serverDispatcher = new BaseFilter(this);
System.out.println("creteated new dispatcher");
protocolChain.addFilter(serverDispatcher);
protocolChain.setContinuousExecution(true);
controller.setProtocolChainInstanceHandler(
new ProtocolChainInstanceHandler() {
public ProtocolChain poll() {
return protocolChain;
}
public boolean offer(ProtocolChain protocolChain) {
return false;
}
});
System.out.print("Server : Starting server on port :" + port);
startController(controller);
if (sslConfig != null) {
((SSLConnectorHandler) controller.acquireConnectorHandler(protocol)).configure(sslConfig);
System.out.println(" SSL Mode");
} else {
System.out.println();
}
}
/**
* Stops this server
*/
public void stopServer() {
stopController(controller);
System.out.println("Server : Stopping server on port :" + port);
}
public abstract void stop();
public void startController(final Controller controller) {
final CountDownLatch latch = new CountDownLatch(1);
controller.addStateListener(new ControllerStateListenerAdapter() {
@Override
public void onReady() {
latch.countDown();
}
@Override
public void onException(Throwable e) {
if (latch.getCount() > 0) {
Controller.logger().log(Level.SEVERE, "Exception during " +
"starting the controller", e);
latch.countDown();
} else {
Controller.logger().log(Level.SEVERE, "Exception during " +
"controller processing", e);
}
}
});
new Thread(controller).start();
try {
latch.await();
} catch (InterruptedException ex) {
}
if (!controller.isStarted()) {
throw new IllegalStateException("Controller is not started!");
}
}
/**
* Stop controller in seperate thread
*/
public void stopController(Controller controller) {
try {
controller.stop();
} catch(IOException e) {
e.printStackTrace();
}
}
/*
* 实现接收解析器
*/
public abstract ParserProtocolFilter getNewParser();
public abstract void service(Object reqObj,final Context ctx);
}
package com.ntg.frameWork.act.AppFrame;
import java.io.IOException;
import java.util.concurrent.RejectedExecutionException;
import com.sun.grizzly.Context;
import com.sun.grizzly.ProtocolFilter;
import com.sun.grizzly.ProtocolParser;
public class BaseFilter implements ProtocolFilter {
// private ExecutorService executorService = null;
// protected final ThreadGroup threadGroup = new ThreadGroup("GrizzlySample");
// protected int threadCounter = 0;
private boolean shuttingDown = false;
private BaseServer theServer;
BaseFilter(BaseServer bserver){
super();
theServer = bserver;
}
public boolean postExecute(Context ctx) throws IOException {
return true;
}
public boolean execute(final Context workerCtx) throws IOException {
final Object incomingMessage = workerCtx.removeAttribute(ProtocolParser.MESSAGE);
workerCtx.incrementRefCount();
try {
theServer.service(incomingMessage ,workerCtx);
} catch (RejectedExecutionException exception) {
workerCtx.getController().returnContext(workerCtx);
if (shuttingDown) {
//Ok do nothing because client is shutting down
} else {
exception.printStackTrace();
}
}
catch (Throwable exception) {
exception.printStackTrace();
workerCtx.getController().returnContext(workerCtx);
}
return false;
}
// public void onMessageError(Object msg, Context ctx) {
// System.out.println("error");
// }
// public void onRequestMessage(Object msg, Context ctx) {
//
// ByteBuffer retObj = theServer.service(msg ,ctx);
//
// try {
// ctx.getAsyncQueueWritable().writeToAsyncQueue(retObj, theServer);
//
// } catch (IOException e) {
// if (ctx.getSelectionKey().isValid()) {
// theServer.onException(e, null, retObj, null);
// }
// }
// }
// public void stop() {
// if (executorService != null) {
// shuttingDown = true;
// executorService.shutdown();
// try {
// executorService.awaitTermination(2, TimeUnit.SECONDS);
// } catch (InterruptedException e) {
//
// }
// executorService.isTerminated();
// }
// }
}
package com.ntg.frameWork.act;
/*
* 消息队列
*/
public class Event{
/*
* 事件类型:
* ACTION:收到完整请求
* PUSH:需要推送数据
*/
private String type;
private Object eventObj;
public Object getEventObj() {
return eventObj;
}
public void setEventObj(Object eventObj) {
this.eventObj = eventObj;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import com.ntg.frameWork.act.BaseActForm;
import com.ntg.frameWork.act.BaseAction;
import com.ntg.frameWork.act.DataForm;
import com.ntg.frameWork.act.Event;
import com.ntg.frameWork.act.FormCreator;
import com.ntg.frameWork.act.PageForm;
import com.ntg.frameWork.act.AppFrame.BaseServer;
import com.ntg.frameWork.cache.BusinessCache;
import com.ntg.frameWork.cache.CacheUtil;
import com.ntg.frameWork.config.AppConfig;
import com.ntg.frameWork.config.Protocal;
import com.ntg.frameWork.page.PageManager;
import com.sun.grizzly.Context;
import com.sun.grizzly.ProtocolParser;
import com.sun.grizzly.filter.ParserProtocolFilter;
import com.wutka.jox.JOXBeanInputStream;
public class AppServer extends BaseServer {
private ExecutorService executorService = null;
protected final ThreadGroup threadGroup = new ThreadGroup("GrizzlySample");
protected int threadCounter = 0;
private boolean shuttingDown = false;
private static String ClassName = AppServer.class.getName();
private Logger log;
private AppConfig appcfg;
public AppServer() {
super();
// 初始化
init();
}
public void service(final Object retObj, final Context workerCtx) {
synchronized (this) {
if (executorService == null) {
executorService = Executors
.newCachedThreadPool(new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(threadGroup, r,
"SampleThread No." + (++threadCounter));
}
});
}
}
executorService.execute(new Runnable() {
public void run() {
});
}
/*
* 处理消息
*/
public void OnEvent(Object[] ev) throws InterruptedException {
for (int i = 0; i < ev.length; i++) {
Event evObj = (Event) ev[i];
if (Functions.EVENT_EXIT.endsWith(evObj.getType())) {
// 退出线程
setBKeepingRunning(false);
break;
}
}
}
/*
* 处理退出
*/
public void OnExit() {
stop();
}
/*
* 处理运行时异常
*/
public void OnException(Exception e) {
}
public ParserProtocolFilter getNewParser() {
return new ParserProtocolFilter() {
public ProtocolParser newProtocolParser() {
CustomProtocolParser parser = new CustomProtocolParser(null);
parser.setBytesArrivedListener(null);
// parser.setReplyMessageFactory(replyInputStreamFactory);
return parser;
}
};
}
public void shutdown() {
super.stopServer();
this.stop();
}
public void stop() {
// try {
// mMapUpdateTask mmt= new MLineTimer().new mMapUpdateTask();
// mmt.objectSerial();
// } catch (Exception e1) {
// // TODO 自动生成 catch 块
// e1.printStackTrace();
// }
if (executorService != null) {
shuttingDown = true;
executorService.shutdown();
try {
executorService.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
executorService.isTerminated();
}
try {
// ServerSocket
// SvrSocket.close();
// 行情
if (server != null && server.isAlive())
server.interrupt();
// 资讯定时刷新cache
if (refreshZixunJob != null) {
refreshZixunJob.interrupt();
refreshZixunJob.join(1000);
}
// 资讯定时刷新cache
if (refreshVideoJob != null) {
refreshVideoJob.interrupt();
refreshVideoJob.join(1000);
}
if (tokenToDBJob != null) {
tokenToDBJob.interrupt();
tokenToDBJob.join(1000);
}
// // 港股行情
// if (HKServerSocket != null){
// HKServerSocket.interrupt();
// HKServerSocket.join(1000);
// }
MapDBPool.instance().emptyPool();
//QueryContent.close();
StockFromServer.terminate();
if (xqmng != null)
xqmng.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
protected void init() {
// 读取配置
}
private void configLog4jPath() {
String logPath = Functions.getAppConfigPath() + "log4j.properties";
PropertyConfigurator.configure(logPath);
log = Logger.getLogger(ClassName);
log.info("logPath=" + logPath);
}
private void loadCacheDefaultValue() {
}
public static void main(String[] args) {
AppServer appServer = new AppServer();
Thread app = new Thread(appServer, "AppServer");
// 判断参数,-stop为停止应用服务器,否则为启动服务器
app.start();
appServer.stopServer();
}
}
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import com.ntg.frameWork.act.BaseActForm;
import com.ntg.frameWork.act.BaseAction;
import com.ntg.frameWork.act.DataForm;
import com.ntg.frameWork.act.Event;
import com.ntg.frameWork.act.FormCreator;
import com.ntg.frameWork.act.PageForm;
import com.ntg.frameWork.act.AppFrame.BaseServer;
import com.ntg.frameWork.cache.BusinessCache;
import com.ntg.frameWork.cache.CacheUtil;
import com.ntg.frameWork.config.AppConfig;
import com.ntg.frameWork.config.Protocal;
import com.ntg.frameWork.page.PageManager;
import com.sun.grizzly.Context;
import com.sun.grizzly.ProtocolParser;
import com.sun.grizzly.filter.ParserProtocolFilter;
import com.wutka.jox.JOXBeanInputStream;
public class AppServer extends BaseServer {
private ExecutorService executorService = null;
protected final ThreadGroup threadGroup = new ThreadGroup("GrizzlySample");
protected int threadCounter = 0;
private boolean shuttingDown = false;
private static String ClassName = AppServer.class.getName();
private Logger log;
private AppConfig appcfg;
public AppServer() {
super();
// 初始化
init();
}
public void service(final Object retObj, final Context workerCtx) {
synchronized (this) {
if (executorService == null) {
executorService = Executors
.newCachedThreadPool(new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(threadGroup, r,
"SampleThread No." + (++threadCounter));
}
});
}
}
executorService.execute(new Runnable() {
public void run() {
});
}
/*
* 处理消息
*/
public void OnEvent(Object[] ev) throws InterruptedException {
for (int i = 0; i < ev.length; i++) {
Event evObj = (Event) ev[i];
if (Functions.EVENT_EXIT.endsWith(evObj.getType())) {
// 退出线程
setBKeepingRunning(false);
break;
}
}
}
/*
* 处理退出
*/
public void OnExit() {
stop();
}
/*
* 处理运行时异常
*/
public void OnException(Exception e) {
}
public ParserProtocolFilter getNewParser() {
return new ParserProtocolFilter() {
public ProtocolParser newProtocolParser() {
CustomProtocolParser parser = new CustomProtocolParser(null);
parser.setBytesArrivedListener(null);
// parser.setReplyMessageFactory(replyInputStreamFactory);
return parser;
}
};
}
public void shutdown() {
super.stopServer();
this.stop();
}
public void stop() {
// try {
// mMapUpdateTask mmt= new MLineTimer().new mMapUpdateTask();
// mmt.objectSerial();
// } catch (Exception e1) {
// // TODO 自动生成 catch 块
// e1.printStackTrace();
// }
if (executorService != null) {
shuttingDown = true;
executorService.shutdown();
try {
executorService.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
executorService.isTerminated();
}
try {
// ServerSocket
// SvrSocket.close();
// 行情
if (server != null && server.isAlive())
server.interrupt();
// 资讯定时刷新cache
if (refreshZixunJob != null) {
refreshZixunJob.interrupt();
refreshZixunJob.join(1000);
}
// 资讯定时刷新cache
if (refreshVideoJob != null) {
refreshVideoJob.interrupt();
refreshVideoJob.join(1000);
}
if (tokenToDBJob != null) {
tokenToDBJob.interrupt();
tokenToDBJob.join(1000);
}
// // 港股行情
// if (HKServerSocket != null){
// HKServerSocket.interrupt();
// HKServerSocket.join(1000);
// }
MapDBPool.instance().emptyPool();
//QueryContent.close();
StockFromServer.terminate();
if (xqmng != null)
xqmng.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
protected void init() {
// 读取配置
}
private void configLog4jPath() {
String logPath = Functions.getAppConfigPath() + "log4j.properties";
PropertyConfigurator.configure(logPath);
log = Logger.getLogger(ClassName);
log.info("logPath=" + logPath);
}
private void loadCacheDefaultValue() {
}
public static void main(String[] args) {
AppServer appServer = new AppServer();
Thread app = new Thread(appServer, "AppServer");
// 判断参数,-stop为停止应用服务器,否则为启动服务器
app.start();
appServer.stopServer();
}
}