当前位置: 首页 > 工具软件 > ActiveJ > 使用案例 >

ActiveJ框架学习——Async I/O之Eventloop(二)

胡野
2023-12-01

2021SC@SDUSC

Eventloop是异步编程的内部类。在异步编程模型中,必须避免Eventloop线程中的阻塞操作(如I/O或长时间运行的计算)。应使用此类操作的异步版本。Eventloop表示只有一个阻塞操作的无限循环selector.select(),它选择一组键,这些键对应的通道已准备好进行I/O操作。使用从外部添加到Eventloop的带有任务的键和队列,它从方法run()中的一个线程开始异步执行,该方法被覆盖,因为Eventloop是Runnable的实现。当此eventloop没有选定的键且其包含任务的队列为空时,其工作将结束。

该文继上文来看看Eventloop的源代码部分。看看Eventloop的具体操作有哪些。

withEventloopFatalErrorHandler():在事件循环级别设置致命错误。它处理所有未由线程本地错误处理程序fatalErrorHandler处理的错误事件循环级别上的致命错误处理程序
	public @NotNull Eventloop withEventloopFatalErrorHandler(@NotNull FatalErrorHandler fatalErrorHandler) {
		this.eventloopFatalErrorHandler = fatalErrorHandler;
		return this;
	}

 

withThreadFatalErrorHandler():在线程级别设置致命错误这是将使用setThreadFatalErrorHandler(FatalErrorHandler)设置的处理程序。它通常是第一个处理“更靠近”抛出站点fatalErrorHandler的错误的处理程序线程级别上的致命错误处理程序
	public @NotNull Eventloop withThreadFatalErrorHandler(@Nullable FatalErrorHandler fatalErrorHandler) {
		this.threadFatalErrorHandler = fatalErrorHandler;
		return this;
	}
closeSelector():如果选择器已打开,则将其关闭。
	private void closeSelector() {
		if (selector != null) {
			try {
				selector.close();
				selector = null;
				cancelledKeys = 0;
			} catch (IOException e) {
				logger.error("Could not close selector", e);
			}
		}
	}
keepAlive(boolean keepAlive):设置标志(如果设置true),则表示即使所有任*都已执行,此Eventloop的工作仍将继续,并且它没有任何选定的键。用于设置的参数keepAlive标志
	public void keepAlive(boolean keepAlive) {
		this.keepAlive = keepAlive;
		if (!keepAlive && selector != null) {
			selector.wakeup();
		}
	}
run():此eventloop处于活动状态时执行任务的Runnable中重写的方法。
	@Override
	public void run() {
		eventloopThread = Thread.currentThread();
		if (threadName != null)
			eventloopThread.setName(threadName);
		if (threadPriority != 0)
			eventloopThread.setPriority(threadPriority);
		CURRENT_EVENTLOOP.set(this);
		if (threadFatalErrorHandler != null)
			setThreadFatalErrorHandler(threadFatalErrorHandler);
		ensureSelector();
		assert selector != null;
		breakEventloop = false;

		long timeAfterSelectorSelect;
		long timeAfterBusinessLogic = 0;
		while (isAlive()) {
			try {
				long selectTimeout = getSelectTimeout();
				if (inspector != null) inspector.onUpdateSelectorSelectTimeout(selectTimeout);
				if (selectTimeout <= 0) {
					lastSelectedKeys = selector.selectNow();
				} else {
					lastSelectedKeys = selector.select(selectTimeout);
				}
				cancelledKeys = 0;
			} catch (ClosedChannelException e) {
				logger.error("Selector is closed, exiting...", e);
				break;
			} catch (IOException e) {
				recordIoError(e, selector);
			}

			timeAfterSelectorSelect = refreshTimestampAndGet();
			int keys = processSelectedKeys(selector.selectedKeys());
			int concurrentTasks = executeConcurrentTasks();
			int scheduledTasks = executeScheduledTasks();
			int backgroundTasks = executeBackgroundTasks();
			int localTasks = executeLocalTasks();

			if (inspector != null) {
				if (timeAfterBusinessLogic != 0) {
					long selectorSelectTime = timeAfterSelectorSelect - timeAfterBusinessLogic;
					inspector.onUpdateSelectorSelectTime(selectorSelectTime);
				}

				timeAfterBusinessLogic = timestamp;
				boolean taskOrKeyPresent = (keys + concurrentTasks + scheduledTasks + backgroundTasks + localTasks) != 0;
				boolean externalTaskPresent = lastExternalTasksCount != 0;
				long businessLogicTime = timeAfterBusinessLogic - timeAfterSelectorSelect;
				inspector.onUpdateBusinessLogicTime(taskOrKeyPresent, externalTaskPresent, businessLogicTime);
			}

			loop++;
			tick = 0;
		}
		logger.info("{} finished", this);
		eventloopThread = null;
		if (selector != null && selector.isOpen() && selector.keys().stream().anyMatch(SelectionKey::isValid)) {
			logger.warn("Selector is still open, because event loop {} has {} keys", this, selector.keys());
			return;
		}
		closeSelector();
		if (threadFatalErrorHandler != null)
			setThreadFatalErrorHandler(null);
	}
processSelectedKeys():处理与各种I/O事件相关的选定键:接受、连接、读取、写入。包含所有选定键的selectedKeys集,从NIO Selector.select()返回。
	private int processSelectedKeys(@NotNull Set<SelectionKey> selectedKeys) {
		long startTimestamp = timestamp;
		Stopwatch sw = monitoring ? Stopwatch.createUnstarted() : null;

		int invalidKeys = 0, acceptKeys = 0, connectKeys = 0, readKeys = 0, writeKeys = 0;

		Iterator<SelectionKey> iterator = lastSelectedKeys != 0 ? selectedKeys.iterator() : emptyIterator();
		while (iterator.hasNext()) {
			SelectionKey key = iterator.next();
			iterator.remove();

			if (!key.isValid()) {
				invalidKeys++;
				continue;
			}

			if (sw != null) {
				sw.reset();
				sw.start();
			}

			if (key.isAcceptable()) {
				onAccept(key);
				acceptKeys++;
			} else if (key.isConnectable()) {
				onConnect(key);
				connectKeys++;
			} else {
				if (key.isReadable()) {
					onRead(key);
					readKeys++;
				}
				if (key.isValid()) {
					if (key.isWritable()) {
						onWrite(key);
						writeKeys++;
					}
				} else {
					invalidKeys++;
				}
			}
			if (sw != null && inspector != null) inspector.onUpdateSelectedKeyDuration(sw);
		}

		int keys = acceptKeys + connectKeys + readKeys + writeKeys + invalidKeys;

		if (keys != 0) {
			long loopTime = refreshTimestampAndGet() - startTimestamp;
			if (inspector != null)
				inspector.onUpdateSelectedKeysStats(lastSelectedKeys, invalidKeys, acceptKeys, connectKeys, readKeys, writeKeys, loopTime);
		}

		return keys;
	}

 类似资料: