Guzzle/promises源码解读

陆曜文
2023-12-01

最近在读guzzle/promise的源码,觉得有点绕,所以这里记录一下。

promise介绍

promise是一个异步操作的标准,它规定了应当如何写一系列相关的异步操作,主要目的是规范化异步操作,避免大量的异步操作导致的回调地域。

Guzzle中的promise

promise是如何规范异步操作的

  • 异步操作的状态只有三种:pending-等待,fulfilled-成功,rejected-失败
  • 异步状态的变化只有两种:pending->fullfilled,pending->rejected
  • promise是可以传递的,也就是说promise A的结果可以是另一个promise B,那么A的成功与否,就要取决于B,B成功了,A就成功了,B失败了,A就失败了
  • 对结果成功与否的操作封装在onFulfilled和onRejected两个函数中
  • 可以手动的调用resolve达成promise,也可以手动调用reject拒绝promise
  • 如果需要同步的等待异步操作完成,可以调用wait方法,但需要在wait方法中达成promise或者拒绝promise
  • 也可以直接cancel某个promise并做相关操作(比reject多的就是可以附加某些相关操作,比如关闭socket链接、数据库链接等)

guzzle中是如何实现异步的

理解了promise的概念,最关键的一个问题应该是php中如何实现异步操作。
我们都知道php是进程级别的,当然也可以使用pthread来引入线程,但引入的线程也是有缺陷的(对资源类型的操作会有风险),且连作者都不推荐在cgi模式下运行。那么在php中如何实现异步操作呢?
guzzle中使用的是队列+register_shutdown_function的方法。
简单的讲,就是将异步操作的具体操作集中到一个队列中,然后注册到register_shutdown_function,在脚本执行完毕时依顺序执行队列中的操作。感觉很有trick…具体代码在TaskQueue中:

if ($withShutdown) {
    register_shutdown_function(function () {
        if ($this->enableShutdown) {
            // Only run the tasks if an E_ERROR didn't occur.
            $err = error_get_last();
            if (!$err || ($err['type'] ^ E_ERROR)) {
                $this->run();
            }
        }
    });
}

如何规定一个promise

在guzzle的promise中,一个promise包含6个属性:

  • state:当前promise的状态
  • result:promise的结果,包括成功的结果和拒绝的结果,可以是另一个promise
  • watiFn:调用wait时执行的function,在构造对象时传入
  • cancelFn:调用cancel时执行的function,在构造对象时传入
  • waitList:当前promise所依赖的promise
  • handler:当前promise保存的操作

类中的方法就很多了,这里不一一介绍,还是介绍几个关键的function。

construct

在构造方法中,需要传入waitFn和cancelFn,当然你也可以不传入。这个方法没什么好说的。

then

这是比较关键的一个方法,它揭示了如何实现链式操作和promise是如何传递的。

// 如果状态是pending,说明仍然没有结果,那么将继续注册onFulfilled和onRejected
if ($this->state === self::PENDING) {
	// 每次都是创建一个新的promise
    $p = new Promise(null, [$this, 'cancel']);
    // 保存每次注册的方法
    $this->handlers[] = [$p, $onFulfilled, $onRejected];
    $p->waitList = $this->waitList;
    // 保存新创建的promise
    $p->waitList[] = $this;
    return $p;
}

// 如果已经是成功状态,直接执行所有成功操作
if ($this->state === self::FULFILLED) {
    $promise = Create::promiseFor($this->result);
    return $onFulfilled ? $promise->then($onFulfilled) : $promise;
}

// 如果已经是失败状态,直接执行所有失败操作
$rejection = Create::rejectionFor($this->result);
return $onRejected ? $rejection->then(null, $onRejected) : $rejection;
resolve/reject/settle

将resolve和reject都封装成settle的调用,首先通过下列代码控制了状态的变化规则:

if ($this->state !== self::PENDING) {
    // Ignore calls with the same resolution.
    if ($state === $this->state && $value === $this->result) {
        return;
    }
    throw $this->state === $state
        ? new \LogicException("The promise is already {$state}.")
        : new \LogicException("Cannot change a {$this->state} promise to {$state}");
}

后续再根据$value(settle时传入的参数,也就是调用resolve和reject时传入的参数)的类型决定下一步操作:

// 如果value不是promise,则根据state注册操作到queue中
if (!is_object($value) || !method_exists($value, 'then')) {
    $id = $state === self::FULFILLED ? 1 : 2;
    Utils::queue()->add(...);
} elseif ($value instanceof Promise && Is::pending($value)) {
    // 如果是promise且还在pending,则合并操作
    $value->handlers = array_merge($value->handlers, $handlers);
} else {
    // 如果是promise且不再pending,则调用$value的then方法,先解决$value
    $value->then(
        ...,
        ...
    );
}

对promise这块的理解,我还不是很透彻,先记录到这,后面如果有更新,我在补充上。

 类似资料: