2021SC@SDUSC
Eventloop是异步编程的内部类。在异步编程模型中,必须避免Eventloop线程中的阻塞操作(如I/O或长时间运行的计算)。应使用此类操作的异步版本。Eventloop表示只有一个阻塞操作的无限循环selector.select(),它选择一组键,这些键对应的通道已准备好进行I/O操作。使用从外部添加到Eventloop的带有任务的键和队列,它从方法run()中的一个线程开始异步执行,该方法被覆盖,因为Eventloop是Runnable的实现。当此eventloop没有选定的键且其包含任务的队列为空时,其工作将结束。
该文继上文来看看Eventloop的源代码部分。看看Eventloop的具体操作有哪些。
withEventloopFatalErrorHandler():在事件循环级别设置致命错误。它处理所有未由线程本地错误处理程序fatalErrorHandler处理的错误事件循环级别上的致命错误处理程序
public @NotNull Eventloop withEventloopFatalErrorHandler(@NotNull FatalErrorHandler fatalErrorHandler) {
this.eventloopFatalErrorHandler = fatalErrorHandler;
return this;
}
withThreadFatalErrorHandler():在线程级别设置致命错误这是将使用setThreadFatalErrorHandler(FatalErrorHandler)设置的处理程序。它通常是第一个处理“更靠近”抛出站点fatalErrorHandler的错误的处理程序线程级别上的致命错误处理程序
public @NotNull Eventloop withThreadFatalErrorHandler(@Nullable FatalErrorHandler fatalErrorHandler) {
this.threadFatalErrorHandler = fatalErrorHandler;
return this;
}
closeSelector():如果选择器已打开,则将其关闭。
private void closeSelector() {
if (selector != null) {
try {
selector.close();
selector = null;
cancelledKeys = 0;
} catch (IOException e) {
logger.error("Could not close selector", e);
}
}
}
keepAlive(boolean keepAlive):设置标志(如果设置true),则表示即使所有任*都已执行,此Eventloop的工作仍将继续,并且它没有任何选定的键。用于设置的参数keepAlive标志
public void keepAlive(boolean keepAlive) {
this.keepAlive = keepAlive;
if (!keepAlive && selector != null) {
selector.wakeup();
}
}
run():此eventloop处于活动状态时执行任务的Runnable中重写的方法。
@Override
public void run() {
eventloopThread = Thread.currentThread();
if (threadName != null)
eventloopThread.setName(threadName);
if (threadPriority != 0)
eventloopThread.setPriority(threadPriority);
CURRENT_EVENTLOOP.set(this);
if (threadFatalErrorHandler != null)
setThreadFatalErrorHandler(threadFatalErrorHandler);
ensureSelector();
assert selector != null;
breakEventloop = false;
long timeAfterSelectorSelect;
long timeAfterBusinessLogic = 0;
while (isAlive()) {
try {
long selectTimeout = getSelectTimeout();
if (inspector != null) inspector.onUpdateSelectorSelectTimeout(selectTimeout);
if (selectTimeout <= 0) {
lastSelectedKeys = selector.selectNow();
} else {
lastSelectedKeys = selector.select(selectTimeout);
}
cancelledKeys = 0;
} catch (ClosedChannelException e) {
logger.error("Selector is closed, exiting...", e);
break;
} catch (IOException e) {
recordIoError(e, selector);
}
timeAfterSelectorSelect = refreshTimestampAndGet();
int keys = processSelectedKeys(selector.selectedKeys());
int concurrentTasks = executeConcurrentTasks();
int scheduledTasks = executeScheduledTasks();
int backgroundTasks = executeBackgroundTasks();
int localTasks = executeLocalTasks();
if (inspector != null) {
if (timeAfterBusinessLogic != 0) {
long selectorSelectTime = timeAfterSelectorSelect - timeAfterBusinessLogic;
inspector.onUpdateSelectorSelectTime(selectorSelectTime);
}
timeAfterBusinessLogic = timestamp;
boolean taskOrKeyPresent = (keys + concurrentTasks + scheduledTasks + backgroundTasks + localTasks) != 0;
boolean externalTaskPresent = lastExternalTasksCount != 0;
long businessLogicTime = timeAfterBusinessLogic - timeAfterSelectorSelect;
inspector.onUpdateBusinessLogicTime(taskOrKeyPresent, externalTaskPresent, businessLogicTime);
}
loop++;
tick = 0;
}
logger.info("{} finished", this);
eventloopThread = null;
if (selector != null && selector.isOpen() && selector.keys().stream().anyMatch(SelectionKey::isValid)) {
logger.warn("Selector is still open, because event loop {} has {} keys", this, selector.keys());
return;
}
closeSelector();
if (threadFatalErrorHandler != null)
setThreadFatalErrorHandler(null);
}
processSelectedKeys():处理与各种I/O事件相关的选定键:接受、连接、读取、写入。包含所有选定键的selectedKeys集,从NIO Selector.select()返回。
private int processSelectedKeys(@NotNull Set<SelectionKey> selectedKeys) {
long startTimestamp = timestamp;
Stopwatch sw = monitoring ? Stopwatch.createUnstarted() : null;
int invalidKeys = 0, acceptKeys = 0, connectKeys = 0, readKeys = 0, writeKeys = 0;
Iterator<SelectionKey> iterator = lastSelectedKeys != 0 ? selectedKeys.iterator() : emptyIterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (!key.isValid()) {
invalidKeys++;
continue;
}
if (sw != null) {
sw.reset();
sw.start();
}
if (key.isAcceptable()) {
onAccept(key);
acceptKeys++;
} else if (key.isConnectable()) {
onConnect(key);
connectKeys++;
} else {
if (key.isReadable()) {
onRead(key);
readKeys++;
}
if (key.isValid()) {
if (key.isWritable()) {
onWrite(key);
writeKeys++;
}
} else {
invalidKeys++;
}
}
if (sw != null && inspector != null) inspector.onUpdateSelectedKeyDuration(sw);
}
int keys = acceptKeys + connectKeys + readKeys + writeKeys + invalidKeys;
if (keys != 0) {
long loopTime = refreshTimestampAndGet() - startTimestamp;
if (inspector != null)
inspector.onUpdateSelectedKeysStats(lastSelectedKeys, invalidKeys, acceptKeys, connectKeys, readKeys, writeKeys, loopTime);
}
return keys;
}