php多线程

阎彬炳
2023-12-01

PHP 高级编程之多线程

 

转自:https://my.oschina.net/neochen/blog/294354

 

1. 多线程环境安装

1.1. PHP 5.5.9

安装PHP 5.5.9

https://github.com/oscm/shell/blob/master/php/5.5.9.sh

 
  1. ./configure --prefix=/srv/php-5.5.9 \

  2. --with-config-file-path=/srv/php-5.5.9/etc \

  3. --with-config-file-scan-dir=/srv/php-5.5.9/etc/conf.d \

  4. --enable-fpm \

  5. --with-fpm-user=www \

  6. --with-fpm-group=www \

  7. --with-pear \

  8. --with-curl \

  9. --with-gd \

  10. --with-jpeg-dir \

  11. --with-png-dir \

  12. --with-freetype-dir \

  13. --with-zlib-dir \

  14. --with-iconv \

  15. --with-mcrypt \

  16. --with-mhash \

  17. --with-pdo-mysql \

  18. --with-mysql-sock=/var/lib/mysql/mysql.sock \

  19. --with-openssl \

  20. --with-xsl \

  21. --with-recode \

  22. --enable-sockets \

  23. --enable-soap \

  24. --enable-mbstring \

  25. --enable-gd-native-ttf \

  26. --enable-zip \

  27. --enable-xml \

  28. --enable-bcmath \

  29. --enable-calendar \

  30. --enable-shmop \

  31. --enable-dba \

  32. --enable-wddx \

  33. --enable-sysvsem \

  34. --enable-sysvshm \

  35. --enable-sysvmsg \

  36. --enable-opcache \

  37. --enable-pcntl \

  38. --enable-maintainer-zts \

  39. --disable-debug

编译必须启用zts支持否则无法安装 pthreads(--enable-maintainer-zts)

1.2. 安装 pthreads 扩展

安装https://github.com/oscm/shell/blob/master/php/pecl/pthreads.sh

# curl -s https://raw.github.com/oscm/shell/master/php/pecl/pthreads.sh | bash

查看pthreads是否已经安装

# php -m | grep pthreads

2. Thread

 
  1. <?php

  2. class HelloWorld extends Thread {

  3. public function __construct($world) {

  4. $this->world = $world;

  5. }

  6.  
  7. public function run() {

  8. print_r(sprintf("Hello %s\n", $this->world));

  9. }

  10. }

  11.  
  12. $thread = new HelloWorld("World");

  13.  
  14. if ($thread->start()) {

  15. printf("Thread #%lu says: %s\n", $thread->getThreadId(), $thread->join());

  16. }

  17. ?>

3. Worker 与 Stackable

 
  1. <?php

  2. class SQLQuery extends Stackable {

  3.  
  4. public function __construct($sql) {

  5. $this->sql = $sql;

  6. }

  7.  
  8. public function run() {

  9. $dbh = $this->worker->getConnection();

  10. $row = $dbh->query($this->sql);

  11. while($member = $row->fetch(PDO::FETCH_ASSOC)){

  12. print_r($member);

  13. }

  14. }

  15.  
  16. }

  17.  
  18. class ExampleWorker extends Worker {

  19. public static $dbh;

  20. public function __construct($name) {

  21. }

  22.  
  23. /*

  24. * The run method should just prepare the environment for the work that is coming ...

  25. */

  26. public function run(){

  27. self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');

  28. }

  29. public function getConnection(){

  30. return self::$dbh;

  31. }

  32. }

  33.  
  34. $worker = new ExampleWorker("My Worker Thread");

  35.  
  36. $work=new SQLQuery('select * from members order by id desc limit 5');

  37. $worker->stack($work);

  38.  
  39. $table1 = new SQLQuery('select * from demousers limit 2');

  40. $worker->stack($table1);

  41.  
  42. $worker->start();

  43. $worker->shutdown();

  44. ?>

4. 互斥锁

什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。

下面我们举一个例子,一个简单的计数器程序,说明有无互斥锁情况下的不同。

 
  1. <?php

  2. $counter = 0;

  3. //$handle=fopen("php://memory", "rw");

  4. //$handle=fopen("php://temp", "rw");

  5. $handle=fopen("/tmp/counter.txt", "w");

  6. fwrite($handle, $counter );

  7. fclose($handle);

  8.  
  9. class CounterThread extends Thread {

  10. public function __construct($mutex = null){

  11. $this->mutex = $mutex;

  12. $this->handle = fopen("/tmp/counter.txt", "w+");

  13. }

  14. public function __destruct(){

  15. fclose($this->handle);

  16. }

  17. public function run() {

  18. if($this->mutex)

  19. $locked=Mutex::lock($this->mutex);

  20.  
  21. $counter = intval(fgets($this->handle));

  22. $counter++;

  23. rewind($this->handle);

  24. fputs($this->handle, $counter );

  25. printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

  26.  
  27. if($this->mutex)

  28. Mutex::unlock($this->mutex);

  29. }

  30. }

  31.  
  32. //没有互斥锁

  33. for ($i=0;$i<50;$i++){

  34. $threads[$i] = new CounterThread();

  35. $threads[$i]->start();

  36.  
  37. }

  38.  
  39. //加入互斥锁

  40. $mutex = Mutex::create(true);

  41. for ($i=0;$i<50;$i++){

  42. $threads[$i] = new CounterThread($mutex);

  43. $threads[$i]->start();

  44.  
  45. }

  46.  
  47. Mutex::unlock($mutex);

  48. for ($i=0;$i<50;$i++){

  49. $threads[$i]->join();

  50. }

  51. Mutex::destroy($mutex);

  52.  
  53. ?>

我们使用文件/tmp/counter.txt保存计数器值,每次打开该文件将数值加一,然后写回文件。当多个线程同时操作一个文件的时候,就会线程运行先后取到的数值不同,写回的数值也不同,最终计数器的数值会混乱。

没有加入锁的结果是计数始终被覆盖,最终结果是2

而加入互斥锁后,只有其中的一个进程完成加一工作并释放锁,其他线程才能得到解锁信号,最终顺利完成计数器累加操作

上面例子也可以通过对文件加锁实现,这里主要讲的是多线程锁,后面会涉及文件锁。

4.1. 多线程与共享内存

在共享内存的例子中,没有使用任何锁,仍然可能正常工作,可能工作内存操作本身具备锁的功能。

 
  1. <?php

  2. $tmp = tempnam(__FILE__, 'PHP');

  3. $key = ftok($tmp, 'a');

  4.  
  5. $shmid = shm_attach($key);

  6. $counter = 0;

  7. shm_put_var( $shmid, 1, $counter );

  8.  
  9. class CounterThread extends Thread {

  10. public function __construct($shmid){

  11. $this->shmid = $shmid;

  12. }

  13. public function run() {

  14.  
  15. $counter = shm_get_var( $this->shmid, 1 );

  16. $counter++;

  17. shm_put_var( $this->shmid, 1, $counter );

  18.  
  19. printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

  20. }

  21. }

  22.  
  23. for ($i=0;$i<100;$i++){

  24. $threads[] = new CounterThread($shmid);

  25. }

  26. for ($i=0;$i<100;$i++){

  27. $threads[$i]->start();

  28.  
  29. }

  30.  
  31. for ($i=0;$i<100;$i++){

  32. $threads[$i]->join();

  33. }

  34. shm_remove( $shmid );

  35. shm_detach( $shmid );

  36. ?>

5. 线程同步

有些场景我们不希望 thread->start() 就开始运行程序,而是希望线程等待我们的命令。

$thread->wait();测作用是 thread->start()后线程并不会立即运行,只有收到 $thread->notify(); 发出的信号后才运行

 
  1. <?php

  2. $tmp = tempnam(__FILE__, 'PHP');

  3. $key = ftok($tmp, 'a');

  4.  
  5. $shmid = shm_attach($key);

  6. $counter = 0;

  7. shm_put_var( $shmid, 1, $counter );

  8.  
  9. class CounterThread extends Thread {

  10. public function __construct($shmid){

  11. $this->shmid = $shmid;

  12. }

  13. public function run() {

  14.  
  15. $this->synchronized(function($thread){

  16. $thread->wait();

  17. }, $this);

  18.  
  19. $counter = shm_get_var( $this->shmid, 1 );

  20. $counter++;

  21. shm_put_var( $this->shmid, 1, $counter );

  22.  
  23. printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);

  24. }

  25. }

  26.  
  27. for ($i=0;$i<100;$i++){

  28. $threads[] = new CounterThread($shmid);

  29. }

  30. for ($i=0;$i<100;$i++){

  31. $threads[$i]->start();

  32.  
  33. }

  34.  
  35. for ($i=0;$i<100;$i++){

  36. $threads[$i]->synchronized(function($thread){

  37. $thread->notify();

  38. }, $threads[$i]);

  39. }

  40.  
  41. for ($i=0;$i<100;$i++){

  42. $threads[$i]->join();

  43. }

  44. shm_remove( $shmid );

  45. shm_detach( $shmid );

  46. ?>

6. 线程池

6.1. 线程池

自行实现一个Pool类

 
  1. <?php

  2. class Update extends Thread {

  3.  
  4. public $running = false;

  5. public $row = array();

  6. public function __construct($row) {

  7.  
  8. $this->row = $row;

  9. $this->sql = null;

  10. }

  11.  
  12. public function run() {

  13.  
  14. if(strlen($this->row['bankno']) > 100 ){

  15. $bankno = safenet_decrypt($this->row['bankno']);

  16. }else{

  17. $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);

  18. file_put_contents("bankno_error.log", $error, FILE_APPEND);

  19. }

  20.  
  21. if( strlen($bankno) > 7 ){

  22. $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

  23.  
  24. $this->sql = $sql;

  25. }

  26.  
  27. printf("%s\n",$this->sql);

  28. }

  29.  
  30. }

  31.  
  32. class Pool {

  33. public $pool = array();

  34. public function __construct($count) {

  35. $this->count = $count;

  36. }

  37. public function push($row){

  38. if(count($this->pool) < $this->count){

  39. $this->pool[] = new Update($row);

  40. return true;

  41. }else{

  42. return false;

  43. }

  44. }

  45. public function start(){

  46. foreach ( $this->pool as $id => $worker){

  47. $this->pool[$id]->start();

  48. }

  49. }

  50. public function join(){

  51. foreach ( $this->pool as $id => $worker){

  52. $this->pool[$id]->join();

  53. }

  54. }

  55. public function clean(){

  56. foreach ( $this->pool as $id => $worker){

  57. if(! $worker->isRunning()){

  58. unset($this->pool[$id]);

  59. }

  60. }

  61. }

  62. }

  63.  
  64. try {

  65. $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(

  66. PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

  67. PDO::MYSQL_ATTR_COMPRESS => true

  68. )

  69. );

  70.  
  71. $sql = "select id,bankno from members order by id desc";

  72. $row = $dbh->query($sql);

  73. $pool = new Pool(5);

  74. while($member = $row->fetch(PDO::FETCH_ASSOC))

  75. {

  76.  
  77. while(true){

  78. if($pool->push($member)){ //压入任务到池中

  79. break;

  80. }else{ //如果池已经满,就开始启动线程

  81. $pool->start();

  82. $pool->join();

  83. $pool->clean();

  84. }

  85. }

  86. }

  87. $pool->start();

  88. $pool->join();

  89.  
  90. $dbh = null;

  91.  
  92. } catch (Exception $e) {

  93. echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";

  94. }

  95. ?>

6.2. 动态队列线程池

上面的例子是当线程池满后执行start统一启动,下面的例子是只要线程池中有空闲便立即创建新线程。

 
  1. <?php

  2. class Update extends Thread {

  3.  
  4. public $running = false;

  5. public $row = array();

  6. public function __construct($row) {

  7.  
  8. $this->row = $row;

  9. $this->sql = null;

  10. //print_r($this->row);

  11. }

  12.  
  13. public function run() {

  14.  
  15. if(strlen($this->row['bankno']) > 100 ){

  16. $bankno = safenet_decrypt($this->row['bankno']);

  17. }else{

  18. $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);

  19. file_put_contents("bankno_error.log", $error, FILE_APPEND);

  20. }

  21.  
  22. if( strlen($bankno) > 7 ){

  23. $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);

  24.  
  25. $this->sql = $sql;

  26. }

  27.  
  28. printf("%s\n",$this->sql);

  29. }

  30.  
  31. }

  32.  
  33.  
  34.  
  35. try {

  36. $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(

  37. PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

  38. PDO::MYSQL_ATTR_COMPRESS => true

  39. )

  40. );

  41.  
  42. $sql = "select id,bankno from members order by id desc limit 50";

  43.  
  44. $row = $dbh->query($sql);

  45. $pool = array();

  46. while($member = $row->fetch(PDO::FETCH_ASSOC))

  47. {

  48. $id = $member['id'];

  49. while (true){

  50. if(count($pool) < 5){

  51. $pool[$id] = new Update($member);

  52. $pool[$id]->start();

  53. break;

  54. }else{

  55. foreach ( $pool as $name => $worker){

  56. if(! $worker->isRunning()){

  57. unset($pool[$name]);

  58. }

  59. }

  60. }

  61. }

  62.  
  63. }

  64.  
  65. $dbh = null;

  66.  
  67. } catch (Exception $e) {

  68. echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";

  69. }

  70. ?>

6.3. pthreads Pool类

pthreads 提供的 Pool class 例子

 
  1. <?php

  2.  
  3. class WebWorker extends Worker {

  4.  
  5. public function __construct(SafeLog $logger) {

  6. $this->logger = $logger;

  7. }

  8.  
  9. protected $loger;

  10. }

  11.  
  12. class WebWork extends Stackable {

  13.  
  14. public function isComplete() {

  15. return $this->complete;

  16. }

  17.  
  18. public function run() {

  19. $this->worker

  20. ->logger

  21. ->log("%s executing in Thread #%lu",

  22. __CLASS__, $this->worker->getThreadId());

  23. $this->complete = true;

  24. }

  25.  
  26. protected $complete;

  27. }

  28.  
  29. class SafeLog extends Stackable {

  30.  
  31. protected function log($message, $args = []) {

  32. $args = func_get_args();

  33.  
  34. if (($message = array_shift($args))) {

  35. echo vsprintf(

  36. "{$message}\n", $args);

  37. }

  38. }

  39. }

  40.  
  41.  
  42. $pool = new Pool(8, \WebWorker::class, [new SafeLog()]);

  43.  
  44. $pool->submit($w=new WebWork());

  45. $pool->submit(new WebWork());

  46. $pool->submit(new WebWork());

  47. $pool->submit(new WebWork());

  48. $pool->submit(new WebWork());

  49. $pool->submit(new WebWork());

  50. $pool->submit(new WebWork());

  51. $pool->submit(new WebWork());

  52. $pool->submit(new WebWork());

  53. $pool->submit(new WebWork());

  54. $pool->submit(new WebWork());

  55. $pool->submit(new WebWork());

  56. $pool->submit(new WebWork());

  57. $pool->submit(new WebWork());

  58. $pool->shutdown();

  59.  
  60. $pool->collect(function($work){

  61. return $work->isComplete();

  62. });

  63.  
  64. var_dump($pool);

7. 多线程文件安全读写(文件锁)

文件所种类。

 
  1. LOCK_SH 取得共享锁定(读取的程序)。

  2. LOCK_EX 取得独占锁定(写入的程序。

  3. LOCK_UN 释放锁定(无论共享或独占)。

  4. LOCK_NB 如果不希望 flock() 在锁定时堵塞

共享锁例子

 
  1. <?php

  2.  
  3. $fp = fopen("/tmp/lock.txt", "r+");

  4.  
  5. if (flock($fp, LOCK_EX)) { // 进行排它型锁定

  6. ftruncate($fp, 0); // truncate file

  7. fwrite($fp, "Write something here\n");

  8. fflush($fp); // flush output before releasing the lock

  9. flock($fp, LOCK_UN); // 释放锁定

  10. } else {

  11. echo "Couldn't get the lock!";

  12. }

  13.  
  14. fclose($fp);

  15.  
  16. ?>

共享锁例子2

 
  1. <?php

  2. $fp = fopen('/tmp/lock.txt', 'r+');

  3.  
  4. /* Activate the LOCK_NB option on an LOCK_EX operation */

  5. if(!flock($fp, LOCK_EX | LOCK_NB)) {

  6. echo 'Unable to obtain lock';

  7. exit(-1);

  8. }

  9.  
  10. /* ... */

  11.  
  12. fclose($fp);

  13. ?>

8. 多线程与数据连接

pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。

8.1. Worker 与 PDO

 
  1. <?php

  2. class Work extends Stackable {

  3.  
  4. public function __construct() {

  5. }

  6.  
  7. public function run() {

  8. $dbh = $this->worker->getConnection();

  9. $sql = "select id,name from members order by id desc limit 50";

  10. $row = $dbh->query($sql);

  11. while($member = $row->fetch(PDO::FETCH_ASSOC)){

  12. print_r($member);

  13. }

  14. }

  15.  
  16. }

  17.  
  18. class ExampleWorker extends Worker {

  19. public static $dbh;

  20. public function __construct($name) {

  21. }

  22.  
  23. /*

  24. * The run method should just prepare the environment for the work that is coming ...

  25. */

  26. public function run(){

  27. self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456');

  28. }

  29. public function getConnection(){

  30. return self::$dbh;

  31. }

  32. }

  33.  
  34. $worker = new ExampleWorker("My Worker Thread");

  35.  
  36. $work=new Work();

  37. $worker->stack($work);

  38.  
  39. $worker->start();

  40. $worker->shutdown();

  41. ?>

8.2. Pool 与 PDO

在线程池中链接数据库

 
  1. # cat pool.php

  2. <?php

  3. class ExampleWorker extends Worker {

  4.  
  5. public function __construct(Logging $logger) {

  6. $this->logger = $logger;

  7. }

  8.  
  9. protected $logger;

  10. }

  11.  
  12. /* the collectable class implements machinery for Pool::collect */

  13. class Work extends Stackable {

  14. public function __construct($number) {

  15. $this->number = $number;

  16. }

  17. public function run() {

  18. $dbhost = 'db.example.com'; // 数据库服务器

  19. $dbuser = 'example.com'; // 数据库用户名

  20. $dbpw = 'password'; // 数据库密码

  21. $dbname = 'example_real';

  22. $dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(

  23. PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

  24. PDO::MYSQL_ATTR_COMPRESS => true,

  25. PDO::ATTR_PERSISTENT => true

  26. )

  27. );

  28. $sql = "select OPEN_TIME, `COMMENT` from MT4_TRADES where LOGIN='".$this->number['name']."' and CMD='6' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";

  29. #echo $sql;

  30. $row = $dbh->query($sql);

  31. $mt4_trades = $row->fetch(PDO::FETCH_ASSOC);

  32. if($mt4_trades){

  33.  
  34. $row = null;

  35.  
  36. $sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt4_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";

  37. $dbh->query($sql);

  38. #printf("%s\n",$sql);

  39. }

  40. $dbh = null;

  41. printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']);

  42.  
  43. }

  44. }

  45.  
  46. class Logging extends Stackable {

  47. protected static $dbh;

  48. public function __construct() {

  49. $dbhost = 'db.example.com'; // 数据库服务器

  50. $dbuser = 'example.com'; // 数据库用户名

  51. $dbpw = 'password'; // 数据库密码

  52. $dbname = 'example_real'; // 数据库名

  53.  
  54. self::$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(

  55. PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

  56. PDO::MYSQL_ATTR_COMPRESS => true

  57. )

  58. );

  59.  
  60. }

  61. protected function log($message, $args = []) {

  62. $args = func_get_args();

  63.  
  64. if (($message = array_shift($args))) {

  65. echo vsprintf("{$message}\n", $args);

  66. }

  67. }

  68.  
  69. protected function getConnection(){

  70. return self::$dbh;

  71. }

  72. }

  73.  
  74. $pool = new Pool(200, \ExampleWorker::class, [new Logging()]);

  75.  
  76. $dbhost = 'db.example.com'; // 数据库服务器

  77. $dbuser = 'example.com'; // 数据库用户名

  78. $dbpw = 'password'; // 数据库密码

  79. $dbname = 'db_example';

  80. $dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(

  81. PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

  82. PDO::MYSQL_ATTR_COMPRESS => true

  83. )

  84. );

  85. $sql = "select `order`,name from accounts where deposit_time is null order by id desc";

  86.  
  87. $row = $dbh->query($sql);

  88. while($account = $row->fetch(PDO::FETCH_ASSOC))

  89. {

  90. $pool->submit(new Work($account));

  91. }

  92.  
  93. $pool->shutdown();

  94.  
  95. ?>

进一步改进上面程序,我们使用单例模式 $this->worker->getInstance(); 全局仅仅做一次数据库连接,线程使用共享的数据库连接

 
  1. <?php

  2. class ExampleWorker extends Worker {

  3.  
  4. #public function __construct(Logging $logger) {

  5. # $this->logger = $logger;

  6. #}

  7.  
  8. #protected $logger;

  9. protected static $dbh;

  10. public function __construct() {

  11.  
  12. }

  13. public function run(){

  14. $dbhost = 'db.example.com'; // 数据库服务器

  15. $dbuser = 'example.com'; // 数据库用户名

  16. $dbpw = 'password'; // 数据库密码

  17. $dbname = 'example'; // 数据库名

  18.  
  19. self::$dbh = new PDO("mysql:host=$dbhost;port=3306;dbname=$dbname", $dbuser, $dbpw, array(

  20. PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

  21. PDO::MYSQL_ATTR_COMPRESS => true,

  22. PDO::ATTR_PERSISTENT => true

  23. )

  24. );

  25.  
  26. }

  27. protected function getInstance(){

  28. return self::$dbh;

  29. }

  30.  
  31. }

  32.  
  33. /* the collectable class implements machinery for Pool::collect */

  34. class Work extends Stackable {

  35. public function __construct($data) {

  36. $this->data = $data;

  37. #print_r($data);

  38. }

  39.  
  40. public function run() {

  41. #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );

  42.  
  43. try {

  44. $dbh = $this->worker->getInstance();

  45. #print_r($dbh);

  46. $id = $this->data['id'];

  47. $mobile = safenet_decrypt($this->data['mobile']);

  48. #printf("%d, %s \n", $id, $mobile);

  49. if(strlen($mobile) > 11){

  50. $mobile = substr($mobile, -11);

  51. }

  52. if($mobile == 'null'){

  53. # $sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";

  54. # printf("%s\n",$sql);

  55. # $dbh->query($sql);

  56. $mobile = '';

  57. $sql = "UPDATE members_digest SET mobile = :mobile where id = :id";

  58. }else{

  59. $sql = "UPDATE members_digest SET mobile = md5(:mobile) where id = :id";

  60. }

  61. $sth = $dbh->prepare($sql);

  62. $sth->bindValue(':mobile', $mobile);

  63. $sth->bindValue(':id', $id);

  64. $sth->execute();

  65. #echo $sth->debugDumpParams();

  66. }

  67. catch(PDOException $e) {

  68. $error = sprintf("%s,%s\n", $mobile, $id );

  69. file_put_contents("mobile_error.log", $error, FILE_APPEND);

  70. }

  71.  
  72. #$dbh = null;

  73. printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);

  74. #printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);

  75. }

  76. }

  77.  
  78. $pool = new Pool(100, \ExampleWorker::class, []);

  79.  
  80. #foreach (range(0, 100) as $number) {

  81. # $pool->submit(new Work($number));

  82. #}

  83.  
  84. $dbhost = 'db.example.com'; // 数据库服务器

  85. $dbuser = 'example.com'; // 数据库用户名

  86. $dbpw = 'password'; // 数据库密码

  87. $dbname = 'example';

  88. $dbh = new PDO("mysql:host=$dbhost;port=3307;dbname=$dbname", $dbuser, $dbpw, array(

  89. PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',

  90. PDO::MYSQL_ATTR_COMPRESS => true

  91. )

  92. );

  93. #print_r($dbh);

  94.  
  95. #$sql = "select id, mobile from members where id < :id";

  96. #$sth = $dbh->prepare($sql);

  97. #$sth->bindValue(':id',300);

  98. #$sth->execute();

  99. #$result = $sth->fetchAll();

  100. #print_r($result);

  101. #

  102. #$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";

  103. #$sth = $dbh->prepare($sql);

  104. #$sth->bindValue(':mobile', 'aa');

  105. #$sth->bindValue(':id','272');

  106. #echo $sth->execute();

  107. #echo $sth->queryString;

  108. #echo $sth->debugDumpParams();

  109.  
  110.  
  111. $sql = "select id, mobile from members order by id asc"; // limit 1000";

  112. $row = $dbh->query($sql);

  113. while($members = $row->fetch(PDO::FETCH_ASSOC))

  114. {

  115. #$order = $account['order'];

  116. #printf("%s\n",$order);

  117. //print_r($members);

  118. $pool->submit(new Work($members));

  119. #unset($account['order']);

  120. }

  121.  
  122. $pool->shutdown();

  123.  
  124. ?>

8.3. 多线程中操作数据库总结

总的来说 pthreads 仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目

数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。

 
  1. <?php

  2. $dbh = new PDO('mysql:host=localhost;dbname=test', $user, $pass, array(

  3. PDO::ATTR_PERSISTENT => true

  4. ));

  5. ?>

9. Thread And ZeroMQ

应用场景,我使用触发器监控数据库某个表,一旦发现有改变就通知程序处理数据

9.1. 数据库端

首先安装ZeroMQ 与 MySQL UDF https://github.com/netkiller/mysql-zmq-plugin, 然后创建触发器。

 
  1. CREATE DEFINER=`dba`@`192.168.%` PROCEDURE `Table_Example`(IN `TICKET` INT, IN `LOGIN` INT, IN `CMD` INT, IN `VOLUME` INT)

  2. LANGUAGE SQL

  3. NOT DETERMINISTIC

  4. READS SQL DATA

  5. SQL SECURITY DEFINER

  6. COMMENT '交易监控'

  7. BEGIN

  8. DECLARE Example CHAR(1) DEFAULT 'N';

  9.  
  10. IF CMD IN ('0','1') THEN

  11. IF VOLUME >=10 AND VOLUME <=90 THEN

  12. select coding into Example from example.members where username = LOGIN and coding = 'Y';

  13. IF Example = 'Y' THEN

  14. select zmq_client('tcp://192.168.2.15:5555', CONCAT(TICKET, ',', LOGIN, ',', VOLUME));

  15. END IF;

  16. END IF;

  17. END IF;

  18. END

  19.  
  20. CREATE DEFINER=`dba`@`192.168.6.20` TRIGGER `Table_AFTER_INSERT` AFTER INSERT ON `MT4_TRADES` FOR EACH ROW BEGIN

  21. call Table_Example(NEW.TICKET,NEW.LOGIN,NEW.CMD,NEW.VOLUME);

  22. END

9.2. 数据处理端

 
  1. <?php

  2. class ExampleWorker extends Worker {

  3.  
  4. #public function __construct(Logging $logger) {

  5. # $this->logger = $logger;

  6. #}

  7.  
  8. #protected $logger;

  9. protected static $dbh;

  10. public function __construct() {

  11.  
  12. }

  13. public function run(){

  14. $dbhost = '192.168.2.1'; // 数据库服务器

  15. $dbport = 3306;

  16. $dbuser = 'www'; // 数据库用户名

  17. $dbpass = 'password'; // 数据库密码

  18. $dbname = 'example'; // 数据库名

  19.  
  20. self::$dbh = new PDO("mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array(

  21. /* PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', */

  22. PDO::MYSQL_ATTR_COMPRESS => true,

  23. PDO::ATTR_PERSISTENT => true

  24. )

  25. );

  26.  
  27. }

  28. protected function getInstance(){

  29. return self::$dbh;

  30. }

  31.  
  32. }

  33.  
  34. /* the collectable class implements machinery for Pool::collect */

  35. class Fee extends Stackable {

  36. public function __construct($msg) {

  37. $trades = explode(",", $msg);

  38. $this->data = $trades;

  39. print_r($trades);

  40. }

  41.  
  42. public function run() {

  43. #$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );

  44.  
  45. try {

  46. $dbh = $this->worker->getInstance();

  47.  
  48. $insert = "INSERT INTO coding_fee(ticket, login, volume, `status`) VALUES(:ticket, :login, :volume,'N')";

  49. $sth = $dbh->prepare($insert);

  50. $sth->bindValue(':ticket', $this->data[0]);

  51. $sth->bindValue(':login', $this->data[1]);

  52. $sth->bindValue(':volume', $this->data[2]);

  53. $sth->execute();

  54. //$sth = null;

  55. //$dbh = null;

  56.  
  57. /* 业务实现在此处 */

  58.  
  59. $update = "UPDATE coding_fee SET `status` = 'Y' WHERE ticket = :ticket and `status` = 'N'";

  60. $sth = $dbh->prepare($update);

  61. $sth->bindValue(':ticket', $this->data[0]);

  62. $sth->execute();

  63. //echo $sth->queryString;

  64. }

  65. catch(PDOException $e) {

  66. $error = sprintf("%s,%s\n", $mobile, $id );

  67. file_put_contents("mobile_error.log", $error, FILE_APPEND);

  68. }

  69.  
  70. #$dbh = null;

  71. //printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);

  72. #printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);

  73. }

  74. }

  75.  
  76. class Example {

  77. /* config */

  78. const LISTEN = "tcp://192.168.2.15:5555";

  79. const MAXCONN = 100;

  80. const pidfile = __CLASS__;

  81. const uid = 80;

  82. const gid = 80;

  83.  
  84. protected $pool = NULL;

  85. protected $zmq = NULL;

  86. public function __construct() {

  87. $this->pidfile = '/var/run/'.self::pidfile.'.pid';

  88. }

  89. private function daemon(){

  90. if (file_exists($this->pidfile)) {

  91. echo "The file $this->pidfile exists.\n";

  92. exit();

  93. }

  94.  
  95. $pid = pcntl_fork();

  96. if ($pid == -1) {

  97. die('could not fork');

  98. } else if ($pid) {

  99. // we are the parent

  100. //pcntl_wait($status); //Protect against Zombie children

  101. exit($pid);

  102. } else {

  103. // we are the child

  104. file_put_contents($this->pidfile, getmypid());

  105. posix_setuid(self::uid);

  106. posix_setgid(self::gid);

  107. return(getmypid());

  108. }

  109. }

  110. private function start(){

  111. $pid = $this->daemon();

  112. $this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []);

  113. $this->zmq = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);

  114. $this->zmq->bind(self::LISTEN);

  115.  
  116. /* Loop receiving and echoing back */

  117. while ($message = $this->zmq->recv()) {

  118. if($message){

  119. $this->pool->submit(new Fee($message));

  120. $this->zmq->send('TRUE');

  121. }else{

  122. $this->zmq->send('FALSE');

  123. }

  124. }

  125. $pool->shutdown();

  126. }

  127. private function stop(){

  128.  
  129. if (file_exists($this->pidfile)) {

  130. $pid = file_get_contents($this->pidfile);

  131. posix_kill($pid, 9);

  132. unlink($this->pidfile);

  133. }

  134. }

  135. private function help($proc){

  136. printf("%s start | stop | help \n", $proc);

  137. }

  138. public function main($argv){

  139. if(count($argv) < 2){

  140. printf("please input help parameter\n");

  141. exit();

  142. }

  143. if($argv[1] === 'stop'){

  144. $this->stop();

  145. }else if($argv[1] === 'start'){

  146. $this->start();

  147. }else{

  148. $this->help($argv[0]);

  149. }

  150. }

  151. }

  152.  
  153. $example = new Example();

  154. $example->main($argv);

使用方法

 
  1. # php example.php start

  2. # php example.php stop

  3. # php example.php help

此程序涉及守候进程实现$this->daemon()运行后转到后台运行,进程ID保存,进程的互斥(不允许同时启动两个进程),线程池连接数以及线程任务等等

 类似资料: