当前位置: 首页 > 编程笔记 >

PHP pthreads v3下worker和pool的使用方法示例

宰父淳
2023-03-14
本文向大家介绍PHP pthreads v3下worker和pool的使用方法示例,包括了PHP pthreads v3下worker和pool的使用方法示例的使用技巧和注意事项,需要的朋友参考一下

本文实例讲述了PHP pthreads v3下worker和pool的使用方法。分享给大家供大家参考,具体如下:

有些人会想,明明用thread已经可以很好的工作了,为什么还要搞个worker和pool?

之所以要用到worker和pool还是因为效率,因为系统创建一个新线程代价是比较昂贵,每个创建的线程会复制当前执行的整个上下文。

尽可能的重用线程可以让我们的程序更高效。

一个简单的worker例子:

<?php
//创建自定义work类,给work取个名字,方便查看
class Work extends Worker
{
  private $name;

  public function __construct($name)
  {
    $this->name = $name;
  }

  public function getName()
  {
    return $this->name;
  }
}

class Task extends Thread
{
  private $num;

  public function __construct($num)
  {
    $this->num = $num;
  }

  public function run()
  {
    //计算累加和
    $total = 0;
    for ($i = 0; $i < $this->num; $i++) {
      $total += $i;
    }
    echo "work : {$this->worker->getName()} task : {$total} \n";
    sleep(1);
  }
}

//创建一个worker线程
$work = new Work('a');

$work->start();

for ($i = 1; $i <= 10; $i++) {
  //将Task对象压栈到worker线程中
  //这个时候Task对象就可以使用worker线程上下文(变量,函数等)
  $work->stack(new Task($i));
}

//循环的清理任务,会阻塞主线程,直到栈中任务都执行完毕
while ($work->collect()) ;

//关闭worker
$work->shutdown();

上面代码在运行的时候,计算结果会每隔一秒出来一条,也就是10个task对象是运行在1个worker线程上的。

如果10个task对象是分别在独立空间运行的,sleep()函数就不会起作用,他们各自sleep并不会影响其他线程。

把上面的代码修改一下:

<?php
//创建自定义work类,给work取个名字,方便查看
class Work extends Worker
{
  private $name;

  public function __construct($name)
  {
    $this->name = $name;
  }

  public function getName()
  {
    return $this->name;
  }
}

class Task extends Thread
{
  private $num;

  public function __construct($num)
  {
    $this->num = $num;
  }

  public function run()
  {
    //计算累加和
    $total = 0;
    for ($i = 0; $i < $this->num; $i++) {
      $total += $i;
    }
    echo "work : {$this->worker->getName()} task : {$total} \n";
    sleep(1);
  }
}

//创建二个worker线程
$work1 = new Work('a');
$work2 = new Work('b');

$work1->start();
$work2->start();

for ($i = 1; $i <= 10; $i++) {
  if ($i <= 5) {
    $work1->stack(new Task($i));
  } else {
    $work2->stack(new Task($i));
  }
}

//循环的清理任务,会阻塞主线程,直到栈中任务都执行完毕
while ($work1->collect() || $work2->collect()) ;

//关闭worker
$work1->shutdown();
$work2->shutdown();

这里我们创建2个worker线程,让10个task对象分别压栈到2个worker中。

这时可以看到,计算结果是一对一对的出来,说明10个task对象跑在了2个worker线程上。

至于需要创建多少个worker线程,和多少个task对象,就看自已的需求了。

worker还有一个好处就是可以重用worker中的对象和方法。我们可以在worker中创建一个连接数据库对象,方便各task调用。

<?php
class DB extends Worker
{
  //注意这里设置为静态成员,pdo连接本身是不能在上下文中共享的
  //声明为静态成员,让每个worker有自已的pdo连接
  private static $db = null;
  public $msg = 'i from db';

  public function run()
  {
    self::$db = new PDO('mysql:host=192.168.33.226;port=3306;dbname=test;charset=utf8', 'root', '');
  }

  public function getDb()
  {
    return self::$db;
  }
}

class Task extends Thread
{
  private $id;
  //注意,这里不要给成员设置默认值,$result成员是线程对象是不可变的,不能被改写
  private $result;

  public function __construct($id)
  {
    $this->id = $id;
  }

  public function run()
  {
    //获取worker中的数据库连接
    $db = $this->worker->getDb();
    $ret = $db->query("select * from tb_user where id = {$this->id}");
    $this->result = $ret->fetch(PDO::FETCH_ASSOC);
    //访问worker中的成员变量msg
    echo "data : {$this->result['id']} {$this->result['name']} \t worker data : {$this->worker->msg} \n";
  }
}

//创建一个worker线程
$work = new DB();

$work->start();

for ($i = 1; $i <= 5; $i++) {
  $work->stack(new Task($i));
}

//循环的清理任务,会阻塞主线程,直到栈中任务都执行完毕
while ($work->collect()) ;

//关闭worker
$work->shutdown();

tb_user表大家可以随意创建,我这里为了演示只创建了id和name字段

运行结果如下:

 

如果说worker是对线程的重用,那么pool就是对worker更高的抽象了,可以同时管理多个worker。

<?php
//之所以要创建一个Id线程类,主要是为了给work取个不同的ID,方便查看,哪些task线程属于哪个work中
class Id extends Thread
{
  private $id;

  public function getId()
  {
    //防止出现id混乱,这里使用同步操作
    $this->synchronized(function () {
      ++$this->id;
    });
    return $this->id;
  }
}

class Work extends Worker
{
  private $id;

  public function __construct(Id $obj)
  {
    $this->id = $obj->getId();
  }

  public function getId()
  {
    return $this->id;
  }
}

class Task extends Thread
{
  private $num = 0;

  public function __construct($num)
  {
    $this->num = $num;
  }

  //计算累加和
  public function run()
  {
    $total = 0;
    for ($i = 0; $i < $this->num; $i++) {
      $total += $i;
    }
    echo "work id : {$this->worker->getId()} task : {$total} \n";
  }
}

//创建pool,可容纳3个work对象
$pool = new Pool(3, 'Work', [new Id()]);

//循环的把20个task线程提交到pool中的work对象上运行
for ($i = 1; $i <= 20; $i++) {
  $pool->submit(new Task($i));
}

//循环的清理任务,会阻塞主线程,直到任务都执行完毕
while ($pool->collect()) ;

//关闭pool
$pool->shutdown();

运行结果如下:

更多关于PHP相关内容感兴趣的读者可查看本站专题:《PHP进程与线程操作技巧总结》、《PHP网络编程技巧总结》、《PHP基本语法入门教程》、《PHP数组(Array)操作技巧大全》、《phphtml" target="_blank">字符串(string)用法总结》、《php+mysql数据库操作入门教程》及《php常见数据库操作技巧汇总》

希望本文所述对大家PHP程序设计有所帮助。

 类似资料:
  • Worker Pool 是一个Erlang进程池,其中的工作进程是Erlang的gen server模式进程。 Worker Pool的目标是非常简单: 提供以透明的方式管理一批工作进程并且对分配到池中的任务尽最大努力实现负载均衡。 一个 Echo 服务器示例: -module(echo_server).-author('elbrujohalcon@inaka.net').-behaviour(g

  • 本文向大家介绍MariaDB中的thread pool详细介绍和使用方法,包括了MariaDB中的thread pool详细介绍和使用方法的使用技巧和注意事项,需要的朋友参考一下 Thread pool是什么 MySQL是每个连接上来都要创建一个线程来执行语句。这样每一个新的连接进来即会创建一个新的线程,这种动作对MySQL本身压力比较大。Threadpool是提供一种线程代理的模型执行每个连接的

  • 如果一个worker奔溃,所有分配给它的未完成的任务必须被重新分配。第一个要求就是给master能够检测worker奔溃的能力。master必须能够检测到何时一个worker奔溃了,同时也能确定哪些worker能够执行它的任务。当一个worker奔溃时,它可能最终部分的执行了任务,或者完全执行完了任务但是还未来得及上报结果。如果计算还有副作用的话,为了清理状态某些恢复步骤可能就很有必要了。

  • 我需要编写静态方法来显示ProgressDialog,在线程中运行worker,并且worker完成后ProgreesDialog必须取消并且我的方法返回worker所做的操作。这是我的代码: 一切正常,但ProgressDialog不会取消。我怀疑looper.quit()做了什么坏事,但我需要停止looper.loop()。我调用loop(),因为我需要立即显示ProgressDialog。

  • 本文向大家介绍PHP pthreads v3下的Volatile简介与使用方法示例,包括了PHP pthreads v3下的Volatile简介与使用方法示例的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了PHP pthreads v3下的Volatile简介与使用方法。分享给大家供大家参考,具体如下: 由于pthreads v3中引入了Threaded对象自动不变性的概念,所以当我们在构

  • 本文向大家介绍Python使用redis pool的一种单例实现方式,包括了Python使用redis pool的一种单例实现方式的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Python使用redis pool的一种单例实现方式。分享给大家供大家参考,具体如下: 为适应多个redis实例共享同一个连接池的场景,可以类似于以下单例方式实现: 更多关于Python相关内容感兴趣的读者可查看