当前位置: 首页 > 工具软件 > ActiveJ > 使用案例 >

ActiveJ框架学习——Async I/O之Eventloop(三)

呼延沈义
2023-12-01

2021SC@SDUSC

Eventloop是异步编程的内部类。在异步编程模型中,必须避免Eventloop线程中的阻塞操作(如I/O或长时间运行的计算)。应使用此类操作的异步版本。Eventloop表示只有一个阻塞操作的无限循环selector.select(),它选择一组键,这些键对应的通道已准备好进行I/O操作。使用从外部添加到Eventloop的带有任务的键和队列,它从方法run()中的一个线程开始异步执行,该方法被覆盖,因为Eventloop是Runnable的实现。当此eventloop没有选定的键且其包含任务的队列为空时,其工作将结束。
继续查看Eventloop的源码工作。

executeLocalTasks():执行从当前线程添加的本地任务
	private int executeLocalTasks() {
		long startTimestamp = timestamp;

		int localTasks = 0;

		Stopwatch sw = monitoring ? Stopwatch.createUnstarted() : null;

		while (true) {
			Runnable runnable = this.localTasks.poll();
			if (runnable == null) {
				break;
			}

			if (sw != null) {
				sw.reset();
				sw.start();
			}

			try {
				executeTask(runnable);
				tick++;
				if (sw != null && inspector != null) inspector.onUpdateLocalTaskDuration(runnable, sw);
			} catch (Throwable e) {
				onFatalError(e, runnable);
			}
			localTasks++;
		}

		this.localTasks.addAll(nextTasks);
		this.nextTasks.clear();

		if (localTasks != 0) {
			long loopTime = refreshTimestampAndGet() - startTimestamp;
			if (inspector != null) inspector.onUpdateLocalTasksStats(localTasks, loopTime);
		}

		return localTasks;
	}
executeConcurrentTasks():执行从其他线程添加的并发任务。
	private int executeConcurrentTasks() {
		long startTimestamp = timestamp;

		int concurrentTasks = 0;

		Stopwatch sw = monitoring ? Stopwatch.createUnstarted() : null;

		while (true) {
			Runnable runnable = this.concurrentTasks.poll();
			if (runnable == null) {
				break;
			}

			if (sw != null) {
				sw.reset();
				sw.start();
			}

			try {
				executeTask(runnable);
				if (sw != null && inspector != null) inspector.onUpdateConcurrentTaskDuration(runnable, sw);
			} catch (Throwable e) {
				onFatalError(e, runnable);
			}
			concurrentTasks++;
		}

		if (concurrentTasks != 0) {
			long loopTime = refreshTimestampAndGet() - startTimestamp;
			if (inspector != null) inspector.onUpdateConcurrentTasksStats(concurrentTasks, loopTime);
		}

		return concurrentTasks;
	}
executeScheduledTasks():执行计划在特定时间戳执行的任务
	private int executeScheduledTasks() {
		return executeScheduledTasks(scheduledTasks);
	}

	private int executeBackgroundTasks() {
		return executeScheduledTasks(backgroundTasks);
	}

	private int executeScheduledTasks(PriorityQueue<ScheduledRunnable> taskQueue) {
		long startTimestamp = timestamp;
		boolean background = taskQueue == backgroundTasks;

		int scheduledTasks = 0;
		Stopwatch sw = monitoring ? Stopwatch.createUnstarted() : null;

		for (; ; ) {
			ScheduledRunnable peeked = taskQueue.peek();
			if (peeked == null)
				break;
			if (peeked.isCancelled()) {
				taskQueue.poll();
				continue;
			}
			if (peeked.getTimestamp() > currentTimeMillis()) {
				break;
			}
			taskQueue.poll();

			Runnable runnable = peeked.getRunnable();
			if (sw != null) {
				sw.reset();
				sw.start();
			}

			if (monitoring && inspector != null) {
				int overdue = (int) (System.currentTimeMillis() - peeked.getTimestamp());
				inspector.onScheduledTaskOverdue(overdue, background);
			}

			try {
				executeTask(runnable);
				tick++;
				peeked.complete();
				if (sw != null && inspector != null) inspector.onUpdateScheduledTaskDuration(runnable, sw, background);
			} catch (Throwable e) {
				onFatalError(e, runnable);
			}

			scheduledTasks++;
		}

		if (scheduledTasks != 0) {
			long loopTime = refreshTimestampAndGet() - startTimestamp;
			if (inspector != null) inspector.onUpdateScheduledTasksStats(scheduledTasks, loopTime, background);
		}

		return scheduledTasks;
	}
onAccept(SelectionKey key):接受传入的socketChannel连接,而不阻塞eventloop线程。
	private void onAccept(SelectionKey key) {
		assert inEventloopThread();

		ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
		if (!serverSocketChannel.isOpen()) { // TODO - remove?
			key.cancel();
			return;
		}

		//noinspection unchecked
		Consumer<SocketChannel> acceptCallback = (Consumer<SocketChannel>) key.attachment();
		for (; ; ) {
			SocketChannel channel;
			try {
				channel = serverSocketChannel.accept();
				if (channel == null)
					break;
				channel.configureBlocking(false);
			} catch (ClosedChannelException e) {
				break;
			} catch (IOException e) {
				recordIoError(e, serverSocketChannel);
				break;
			}

			try {
				acceptCallback.accept(channel);
			} catch (Throwable e) {
				handleError(eventloopFatalErrorHandler, e, acceptCallback);
				closeChannel(channel, null);
			}
		}
	}
onConnect(SelectionKey key):处理新建立的TCP连接,而不阻塞eventloop线程。
	private void onConnect(SelectionKey key) {
		assert inEventloopThread();
		@SuppressWarnings("unchecked") Callback<SocketChannel> cb = (Callback<SocketChannel>) key.attachment();
		SocketChannel channel = (SocketChannel) key.channel();
		boolean connected;
		try {
			connected = channel.finishConnect();
		} catch (IOException e) {
			closeChannel(channel, key);
			cb.accept(null, e);
			return;
		}

		try {
			if (connected) {
				cb.accept(channel, null);
			} else {
				cb.accept(null, new IOException("Connection key was received but the channel was not connected - " +
						"this is not possible without some bug in Java NIO"));
			}
		} catch (Throwable e) {
			handleError(eventloopFatalErrorHandler, e, channel);
			closeChannel(channel, null);
		}
	}
onRead(SelectionKey key):在不阻塞事件循环线程的情况下,处理可用于读取的SocketChannel。
	private void onRead(SelectionKey key) {
		assert inEventloopThread();
		NioChannelEventHandler handler = (NioChannelEventHandler) key.attachment();
		try {
			handler.onReadReady();
		} catch (Throwable e) {
			handleError(eventloopFatalErrorHandler, e, handler);
			closeChannel(key.channel(), null);
		}
	}
onWrite(SelectionKey key):在不阻塞线程的情况下处理可用于写入的SocketChannel。
	private void onWrite(SelectionKey key) {
		assert inEventloopThread();
		NioChannelEventHandler handler = (NioChannelEventHandler) key.attachment();
		try {
			handler.onWriteReady();
		} catch (Throwable e) {
			handleError(eventloopFatalErrorHandler, e, handler);
			closeChannel(key.channel(), null);
		}
	}
listen():创建侦听InetSocketAddress的ServerSocketChannel。
	public @NotNull ServerSocketChannel listen(@Nullable InetSocketAddress address, @NotNull ServerSocketSettings serverSocketSettings, @NotNull Consumer<SocketChannel> acceptCallback) throws IOException {
		if (CHECK) checkState(inEventloopThread(), "Not in eventloop thread");
		ServerSocketChannel serverSocketChannel = null;
		try {
			serverSocketChannel = ServerSocketChannel.open();
			serverSocketSettings.applySettings(serverSocketChannel);
			serverSocketChannel.configureBlocking(false);
			serverSocketChannel.bind(address, serverSocketSettings.getBacklog());
			serverSocketChannel.register(ensureSelector(), SelectionKey.OP_ACCEPT, acceptCallback);
			if (selector != null) {
				selector.wakeup();
			}
			return serverSocketChannel;
		} catch (IOException e) {
			if (serverSocketChannel != null) {
				closeChannel(serverSocketChannel, null);
			}
			throw e;
		}
	}
createDatagramChannel();在此EventLoop中注册新的UDP连接。
	public static @NotNull DatagramChannel createDatagramChannel(DatagramSocketSettings datagramSocketSettings,
			@Nullable InetSocketAddress bindAddress,
			@Nullable InetSocketAddress connectAddress) throws IOException {
		DatagramChannel datagramChannel = null;
		try {
			datagramChannel = DatagramChannel.open();
			datagramSocketSettings.applySettings(datagramChannel);
			datagramChannel.configureBlocking(false);
			datagramChannel.bind(bindAddress);
			if (connectAddress != null) {
				datagramChannel.connect(connectAddress);
			}
			return datagramChannel;
		} catch (IOException e) {
			if (datagramChannel != null) {
				try {
					datagramChannel.close();
				} catch (Exception nested) {
					logger.error("Failed closing datagram channel after I/O error", nested);
					e.addSuppressed(nested);
				}
			}
			throw e;
		}
	}
 类似资料: