2021SC@SDUSC
Eventloop是异步编程的内部类。在异步编程模型中,必须避免Eventloop线程中的阻塞操作(如I/O或长时间运行的计算)。应使用此类操作的异步版本。Eventloop表示只有一个阻塞操作的无限循环selector.select(),它选择一组键,这些键对应的通道已准备好进行I/O操作。使用从外部添加到Eventloop的带有任务的键和队列,它从方法run()中的一个线程开始异步执行,该方法被覆盖,因为Eventloop是Runnable的实现。当此eventloop没有选定的键且其包含任务的队列为空时,其工作将结束。
继续查看Eventloop的源码工作。
executeLocalTasks():执行从当前线程添加的本地任务
private int executeLocalTasks() {
long startTimestamp = timestamp;
int localTasks = 0;
Stopwatch sw = monitoring ? Stopwatch.createUnstarted() : null;
while (true) {
Runnable runnable = this.localTasks.poll();
if (runnable == null) {
break;
}
if (sw != null) {
sw.reset();
sw.start();
}
try {
executeTask(runnable);
tick++;
if (sw != null && inspector != null) inspector.onUpdateLocalTaskDuration(runnable, sw);
} catch (Throwable e) {
onFatalError(e, runnable);
}
localTasks++;
}
this.localTasks.addAll(nextTasks);
this.nextTasks.clear();
if (localTasks != 0) {
long loopTime = refreshTimestampAndGet() - startTimestamp;
if (inspector != null) inspector.onUpdateLocalTasksStats(localTasks, loopTime);
}
return localTasks;
}
executeConcurrentTasks():执行从其他线程添加的并发任务。
private int executeConcurrentTasks() {
long startTimestamp = timestamp;
int concurrentTasks = 0;
Stopwatch sw = monitoring ? Stopwatch.createUnstarted() : null;
while (true) {
Runnable runnable = this.concurrentTasks.poll();
if (runnable == null) {
break;
}
if (sw != null) {
sw.reset();
sw.start();
}
try {
executeTask(runnable);
if (sw != null && inspector != null) inspector.onUpdateConcurrentTaskDuration(runnable, sw);
} catch (Throwable e) {
onFatalError(e, runnable);
}
concurrentTasks++;
}
if (concurrentTasks != 0) {
long loopTime = refreshTimestampAndGet() - startTimestamp;
if (inspector != null) inspector.onUpdateConcurrentTasksStats(concurrentTasks, loopTime);
}
return concurrentTasks;
}
executeScheduledTasks():执行计划在特定时间戳执行的任务
private int executeScheduledTasks() {
return executeScheduledTasks(scheduledTasks);
}
private int executeBackgroundTasks() {
return executeScheduledTasks(backgroundTasks);
}
private int executeScheduledTasks(PriorityQueue<ScheduledRunnable> taskQueue) {
long startTimestamp = timestamp;
boolean background = taskQueue == backgroundTasks;
int scheduledTasks = 0;
Stopwatch sw = monitoring ? Stopwatch.createUnstarted() : null;
for (; ; ) {
ScheduledRunnable peeked = taskQueue.peek();
if (peeked == null)
break;
if (peeked.isCancelled()) {
taskQueue.poll();
continue;
}
if (peeked.getTimestamp() > currentTimeMillis()) {
break;
}
taskQueue.poll();
Runnable runnable = peeked.getRunnable();
if (sw != null) {
sw.reset();
sw.start();
}
if (monitoring && inspector != null) {
int overdue = (int) (System.currentTimeMillis() - peeked.getTimestamp());
inspector.onScheduledTaskOverdue(overdue, background);
}
try {
executeTask(runnable);
tick++;
peeked.complete();
if (sw != null && inspector != null) inspector.onUpdateScheduledTaskDuration(runnable, sw, background);
} catch (Throwable e) {
onFatalError(e, runnable);
}
scheduledTasks++;
}
if (scheduledTasks != 0) {
long loopTime = refreshTimestampAndGet() - startTimestamp;
if (inspector != null) inspector.onUpdateScheduledTasksStats(scheduledTasks, loopTime, background);
}
return scheduledTasks;
}
onAccept(SelectionKey key):接受传入的socketChannel连接,而不阻塞eventloop线程。
private void onAccept(SelectionKey key) {
assert inEventloopThread();
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
if (!serverSocketChannel.isOpen()) { // TODO - remove?
key.cancel();
return;
}
//noinspection unchecked
Consumer<SocketChannel> acceptCallback = (Consumer<SocketChannel>) key.attachment();
for (; ; ) {
SocketChannel channel;
try {
channel = serverSocketChannel.accept();
if (channel == null)
break;
channel.configureBlocking(false);
} catch (ClosedChannelException e) {
break;
} catch (IOException e) {
recordIoError(e, serverSocketChannel);
break;
}
try {
acceptCallback.accept(channel);
} catch (Throwable e) {
handleError(eventloopFatalErrorHandler, e, acceptCallback);
closeChannel(channel, null);
}
}
}
onConnect(SelectionKey key):处理新建立的TCP连接,而不阻塞eventloop线程。
private void onConnect(SelectionKey key) {
assert inEventloopThread();
@SuppressWarnings("unchecked") Callback<SocketChannel> cb = (Callback<SocketChannel>) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
boolean connected;
try {
connected = channel.finishConnect();
} catch (IOException e) {
closeChannel(channel, key);
cb.accept(null, e);
return;
}
try {
if (connected) {
cb.accept(channel, null);
} else {
cb.accept(null, new IOException("Connection key was received but the channel was not connected - " +
"this is not possible without some bug in Java NIO"));
}
} catch (Throwable e) {
handleError(eventloopFatalErrorHandler, e, channel);
closeChannel(channel, null);
}
}
onRead(SelectionKey key):在不阻塞事件循环线程的情况下,处理可用于读取的SocketChannel。
private void onRead(SelectionKey key) {
assert inEventloopThread();
NioChannelEventHandler handler = (NioChannelEventHandler) key.attachment();
try {
handler.onReadReady();
} catch (Throwable e) {
handleError(eventloopFatalErrorHandler, e, handler);
closeChannel(key.channel(), null);
}
}
onWrite(SelectionKey key):在不阻塞线程的情况下处理可用于写入的SocketChannel。
private void onWrite(SelectionKey key) {
assert inEventloopThread();
NioChannelEventHandler handler = (NioChannelEventHandler) key.attachment();
try {
handler.onWriteReady();
} catch (Throwable e) {
handleError(eventloopFatalErrorHandler, e, handler);
closeChannel(key.channel(), null);
}
}
listen():创建侦听InetSocketAddress的ServerSocketChannel。
public @NotNull ServerSocketChannel listen(@Nullable InetSocketAddress address, @NotNull ServerSocketSettings serverSocketSettings, @NotNull Consumer<SocketChannel> acceptCallback) throws IOException {
if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
ServerSocketChannel serverSocketChannel = null;
try {
serverSocketChannel = ServerSocketChannel.open();
serverSocketSettings.applySettings(serverSocketChannel);
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(address, serverSocketSettings.getBacklog());
serverSocketChannel.register(ensureSelector(), SelectionKey.OP_ACCEPT, acceptCallback);
if (selector != null) {
selector.wakeup();
}
return serverSocketChannel;
} catch (IOException e) {
if (serverSocketChannel != null) {
closeChannel(serverSocketChannel, null);
}
throw e;
}
}
createDatagramChannel();在此EventLoop中注册新的UDP连接。
public static @NotNull DatagramChannel createDatagramChannel(DatagramSocketSettings datagramSocketSettings,
@Nullable InetSocketAddress bindAddress,
@Nullable InetSocketAddress connectAddress) throws IOException {
DatagramChannel datagramChannel = null;
try {
datagramChannel = DatagramChannel.open();
datagramSocketSettings.applySettings(datagramChannel);
datagramChannel.configureBlocking(false);
datagramChannel.bind(bindAddress);
if (connectAddress != null) {
datagramChannel.connect(connectAddress);
}
return datagramChannel;
} catch (IOException e) {
if (datagramChannel != null) {
try {
datagramChannel.close();
} catch (Exception nested) {
logger.error("Failed closing datagram channel after I/O error", nested);
e.addSuppressed(nested);
}
}
throw e;
}
}