admin管理员组

文章数量:1794759

PHP多线程编程

PHP多线程编程

from: wwwblogs/niniwzw/archive/2010/01/18/1651082.html

PHP多线程编程(一)

    虽然PHP 中,多线程用的比较的少。但是毕竟可能是会用到了。我最近就遇到这样一个问题,用户提交几百个url以后,要读出这个url 中的标题。

当然,你不希望用户等待的太久,10s 钟应该给出个答案。但是,本身,你要获取一个url 的标题,少的要 0.1s ,多的要好几秒。

显然,采用单个线程的方式是不行的。

 

    我的第一个设计方案是这样的:

   1. 用我前面提供的代码提供一个简单的服务器:  wwwblogs/niniwzw/archive/2009/09/27/1575002.html

   这个服务器的作用是:提供一个url,然后,就读取标题。这里,可以每次读128个字节,看看有没有读到title,如果读到title了就停止读了。

   这样可以省带宽。

 

   2. 在客户端,同时打开1百个 socket ,访问这个服务器。如果提供的url数目超过100,那么就多次运行。

   这个方案,基本上能够满足要求,读比较快的网页如:google 100次,也只要1s 左右。但是,通过测试,发现,有一定

   的概率在打开链接的时候被阻塞。(有时候会阻塞个1s左右,然后继续往下open)可能打开了太多的链接了,会出很大的问题。

 

   当然,这是一个很差的解决方案:建立tcp 链接本身的消耗非常的大。因为可靠有序传输的要求,要维持一个数据结构,而且,系统还要开辟一定的缓存给客户端和服务器端,

   用户缓存数据。如果建立上百个链接,就可能占用很大的内存。作为一个系统的服务,应该尽量的简单,就是,我叫你做什么事情,你做好以后,结果给我就可以了。

  

    一般来说,PHP要进行多线程编程,比较常见的是:

    1. 要进行大量的网络耗时的操作

    2. 要做大量的运算,并且,系统有多个cpu,为了让用户有更快的体验,把一个任务,分成几个小任务,最后合并。

   

    所以,应该尽量不要在调用的地方有太多复杂的逻辑,把逻辑内置在服务中。

 

   我的第二个设计方案是这样的:

   同样用上面的服务器,只是,这个服务器功能变了,接收不超过100个的url,然后打开100个子线程,下载title。最后合并,返回给客户端。

具体怎么编写这个服务器,在下一个部分讲。

   这个一测试,发现效率高了很多。而且也十分的稳定。下载一百下google 大概 0.7s。基本上不会超过1s,而原来的那个方案,经常超过5s(20%的可能性)

 

   当然,如果这样的设计方案只是一个很简单的解决方案。如果有很多人使用你的服务的情况下,肯定不能这样做。

   PHP做企业级别的开发,一个比较复杂的问题,就是多线程怎么处理。还有就是往往采用数组 会引起内存急剧膨胀。一般,数组处理10万条数据已经是极限,

在小网站开发很少会用到一次读取如此大的数据量,要是遇到了,最好通过C 扩展进行解决,否则,一次会损耗 几百M 的内存,10个人用就拖死你。

 

PHP多线程编程(二)管道通信

一个线程如果是个人英雄主义,那么多线程就是集体主义。(不严格区分多进程 和 多线程的差别)

你不再是一个独行侠,而是一个指挥家。

独来独往,非常自由自在,但是,很多时候,不如众人拾柴火焰高。

这就是我对多线程的理解。多线程编程的主要问题是:通信 和 同步问题。

更多PHP 多线程编程的背景知识见:

PHP多线程编程(一)

在PHP 中,如果光用pcntl ,实现比较简单的通信问题都是很困难的。

 

下面介绍管道通信:

1. 管道可以认为是一个队列,不同的线程都可以往里面写东西,也都可以从里面读东西。写就是

在队列末尾添加,读就是在队头删除。

 

2. 管道一般有大小,默认一般是4K,也就是内容超过4K了,你就只能读,不能往里面写了。

 

3. 默认情况下,管道写入以后,就会被阻止,直到读取他的程序读取把数据读完。而读取线程也会被阻止,

   直到有进程向管道写入数据。当然,你可以改变这样的默认属性,用stream_set_block  函数,设置成非阻断模式。

 

下面是我分装的一个管道的类(这个类命名有问题,没有统一,没有时间改成统一的了,我一般先写测试代码,最后分装,所以命名上可能不统一):

  • <?php
  • class Pipe
  • {
  •     public $fifoPath;
  •     private $w_pipe;
  •     private $r_pipe;
  •     /**
  •      * 自动创建一个管道
  •      *
  •      * @param string $name 管道名字
  •      * @param int $mode 管道的权限,默认任何用户组可以读写
  •      */
  •     function __construct($name = 'pipe', $mode = 0666)
  •     {
  •         $fifoPath = "/tmp/$name." . posix_getpid();
  •         if (!file_exists($fifoPath)) {
  •             if (!posix_mkfifo($fifoPath, $mode)) {
  •                 error("create new pipe ($name) error.");
  •                 return false;
  •             }
  •         } else {
  •             error( "pipe ($name) has exit.");
  •             return false;
  •         }
  •         $this->fifoPath = $fifoPath;
  •     }
  •    
  • ///
  • // 写管道函数开始
  • ///
  •     function open_write()
  •     {
  •         $this->w_pipe = fopen($this->fifoPath, 'w');
  •         if ($this->w_pipe == NULL) {
  •             error("open pipe {$this->fifoPath} for write error.");
  •             return false;
  •         }
  •         return true;
  •     }
  •     function write($data)
  •     {
  •         return fwrite($this->w_pipe, $data);
  •     }
  •     function write_all($data)
  •     {
  •         $w_pipe = fopen($this->fifoPath, 'w');
  •         fwrite($w_pipe, $data);
  •         fclose($w_pipe);
  •     }
  •     function close_write()
  •     {
  •         return fclose($this->w_pipe);
  •     }
  • /
  • /// 读管道相关函数开始
  •     function open_read()
  •     {
  •         $this->r_pipe = fopen($this->fifoPath, 'r');
  •         if ($this->r_pipe == NULL) {
  •             error("open pipe {$this->fifoPath} for read error.");
  •             return false;
  •         }
  •         return true;
  •     }
  •     function read($byte = 1024)
  •     {
  •         return fread($this->r_pipe, $byte);
  •     }
  •     function read_all()
  •     {
  •         $r_pipe = fopen($this->fifoPath, 'r');
  •         $data = '';
  •         while (!feof($r_pipe)) {
  •             //echo "read one K\\n";
  •             $data .= fread($r_pipe, 1024);
  •         }
  •         fclose($r_pipe);
  •         return $data;
  •     }
  •     function close_read()
  •     {
  •         return fclose($this->r_pipe);
  •     }
  •     /**
  •      * 删除管道
  •      *
  •      * @return boolean is success
  •      */
  •     function rm_pipe()
  •     {
  •         return unlink($this->fifoPath);
  •     }
  • }
  • ?>
  • 有了这个类,就可以实现简单的管道通信了,因为这个教程是多线程编程系列教程的一个部分。

    这个管道类的应用部分,将放到第三部分。

     

    PHP多线程编程(三)多线程抓取网页的演示

    要理解这个部分的代码,请阅读:

    用 Socket 和 Pcntl 实现一个多线程服务器(一)

    PHP多线程编程(一)

    PHP多线程编程(二)管道通信

     

    我们知道,从父进程到子经常的数据传递相对比较容易一些,但是从子进程传递到父进程就比较的困难。

    有很多办法实现进程交互,在php中比较方便的是 管道通信。当然,还可以通过 socket_pair 进行通信。

     

    首先是服务器为了应对每一个请求要做的事情(发送一个url 序列,url序列用\\t 分割。而结束标记是 \\n)

  • function clientHandle($msgsock, $obj)
  • {
  •     $nbuf = '';
  •     socket_set_block($msgsock);
  •     do {
  •         if (false === ($buf = @socket_read($msgsock, 2048, PHP_NORMAL_READ))) {
  •             $obj->error("socket_read() failed: reason: " . socket_strerror(socket_last_error($msgsock)));
  •             break;
  •         }
  •         $nbuf .= $buf;
  •         if (substr($nbuf, -1) != "\\n") {
  •             continue;
  •         }
  •         $nbuf = trim($nbuf);
  •         if ($nbuf == 'quit') {
  •             break;
  •         }
  •         if ($nbuf == 'shutdown') {
  •             break;
  •         }
  •         $url = explode("\\t", $nbuf);
  •         $nbuf = '';
  •         $talkback = serialize(read_ntitle($url));
  •         socket_write($msgsock, $talkback, strlen($talkback));
  •         debug("write to the client\\n");
  •         break;
  •     } while (true);
  • }
  • 上面代码比较关键的一个部分是 read_ntitle,这个函数实现多线程的读取标题。

     

    代码如下:(为每一个url fork 一个线程,然后打开管道 ,读取到的标题写入到管道里面去,主线程一直的在读取管道数据,直到所有的数据读取完毕,最后删除管道)

  • function read_ntitle($arr)
  • {
  •     $pipe = new Pipe("multi-read");
  •     foreach ($arr as $k => $item)
  •     {
  •         $pids[$k] = pcntl_fork();
  •         if(!$pids[$k])
  •         {
  •              $pipe->open_write();
  •              $pid = posix_getpid();
  •              $content = base64_encode(read_title($item));
  •              $pipe->write("$k,$content\\n");
  •              $pipe->close_write();
  •              debug("$k: write success!\\n");
  •              exit;
  •         }
  •     }
  •     debug("read begin!\\n");
  •     $data = $pipe->read_all();
  •     debug("read end!\\n");
  •     $pipe->rm_pipe();
  •     return parse_data($data);
  • }
  • parse_data 代码如下,非常的简单,就不说了。
  • function parse_data($data)
  • {
  •     $data = explode("\\n", $data);
  •     $new = array();
  •     foreach ($data as $value)
  •     {
  •         $value = explode(",", $value);
  •         if (count($value) == 2) {
  •             $value[1] = base64_decode($value[1]);
  •             $new[intval($value[0])] = $value[1];
  •         }
  •     }
  •     ksort($new, SORT_NUMERIC);
  •     return $new;
  • }
  • 上面代码中,还有一个函数read_title 比较有技巧。为了兼容性,我没有采用curl,而是直接采用socket 通信。

    在下载到 title 标签后,就停止读取内容,以节省时间。代码如下:

  • function read_title($url)
  • {
  •     $url_info = parse_url($url);
  •     if (!isset($url_info['host']) || !isset($url_info['scheme'])) {
  •      return false;
  •     }
  •     $host = $url_info['host'];
  •     
  •  $port = isset($url_info['port']) ? $url_info['port'] : null;
  •  $path = isset($url_info['path']) ? $url_info['path'] : "/";
  •  if(isset($url_info['query'])) $path .= "?".$url_info['query'];
  •  if(empty($port)){
  •   $port = 80;
  •  }
  •  if ($url_info['scheme'] == 'https'){
  •   $port = 443;
  •  }
  •  if ($url_info['scheme'] == 'http') {
  •   $port = 80;
  •  }
  •     $out = "GET $path HTTP/1.1\\r\\n";
  •     $out .= "Host: $host\\r\\n";
  •     $out .= "User-Agent: Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1.7)\\r\\n";
  •     $out .= "Connection: Close\\r\\n\\r\\n";
  •     $fp = fsockopen($host, $port, $errno, $errstr, 5);
  •     if ($fp == NULL) {
  •      error("get title from $url, error. $errno: $errstr \\n");
  •      return false;
  •     }
  •     fwrite($fp, $out);
  •     $content = '';
  •     while (!feof($fp)) {
  •         $content .= fgets($fp, 1024);
  •         if (preg_match("/<title>(.*?)<\\/title>/is", $content, $matches)) {
  •              fclose($fp);
  •             return encode_to_utf8($matches[1]);
  •         }
  •     }
  •     fclose($fp);
  •     return false;
  • }
  • function encode_to_utf8($string)
  • {
  •      return mb_convert_encoding($string, "UTF-8", mb_detect_encoding($string, "UTF-8, GB2312, ISO-8859-1", true));
  • }
  • 这里,我只是检测了 三种最常见的编码。

    其他的代码都很简单,这些代码都是测试用的,如果你要做这样一个服务器,一定要进行优化处理。特别是,要防止一次打开太多的线程,你要做更多的处理。

    很多时候,我们抱怨php 不支持多线程,实际上,php是支持多线程的。当然,没有那么多的进程通信的选项,而多线程的核心就在于线程的通信与同步。

    在web开发中,这样的多线程基本上是不会使用的,因为有很严重的性能问题。要实现比较简单的多线程,高负载,必须借助其扩展。

     

    PHP多进程(四) 内部多进程

    上面一个系列的教程:

    用 Socket 和 Pcntl 实现一个多进程服务器(一)

    PHP多进程编程(一)

    PHP多进程编程(二)管道通信

    PHP多进程编程(三)多进程抓取网页的演示

     

    说的都是只兼容unix 服务器的多进程,下面来讲讲在window 和 unix 都兼容的多进程(这里是泛指,下面的curl实际上是通过IO复用实现的)。

        通过扩展实现多线程的典型例子是CURL,CURL 支持多线程的抓取网页的功能。

    这部分过于抽象,所以,我先给出一个CURL并行抓取多个网页内容的一个分装类。这个类实际上很实用,

    详细分析这些函数的内部实现将在下一个教程里面描述。

        你可能不能很好的理解这个类,而且,php curl 官方主页上都有很多错误的例子,在讲述了其内部机制

    后,你就能够明白了。

        先看代码:

  • <?php
  • class Http_MultiRequest
  • {
  •     //要并行抓取的url 列表
  •     private $urls = array();
  •     //curl 的选项
  •     private $options;
  •     
  •     //构造函数
  •     function __construct($options = array())
  •     {
  •         $this->setOptions($options);
  •     }
  •     //设置url 列表
  •     function setUrls($urls)
  •     {
  •         $this->urls = $urls;
  •         return $this;
  •     }
  •     //设置选项
  •     function setOptions($options)
  •     {
  •         $options[CURLOPT_RETURNTRANSFER] = 1;
  •         if (isset($options['HTTP_POST']))
  •         {
  •             curl_setopt($ch, CURLOPT_POST, 1);
  •             curl_setopt($ch, CURLOPT_POSTFIELDS, $options['HTTP_POST']);
  •             unset($options['HTTP_POST']);
  •         }
  •         if (!isset($options[CURLOPT_USERAGENT]))
  •         {
  •             $options[CURLOPT_USERAGENT] = 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1;)';
  •         }
  •         if (!isset($options[CURLOPT_FOLLOWLOCATION]))
  •         {
  •             $options[CURLOPT_FOLLOWLOCATION] = 1;
  •         }
  •         if (!isset($options[CURLOPT_HEADER]))
  •         {
  •             $options[CURLOPT_HEADER] = 0;
  •         }
  •         $this->options = $options;
  •     }
  •     //并行抓取所有的内容
  •     function exec()
  •     {
  •         if(empty($this->urls) || !is_array($this->urls))
  •         {
  •             return false;
  •         }
  •         $curl = $data = array();
  •         $mh = curl_multi_init();
  •         foreach($this->urls as $k => $v)
  •         {
  •             $curl[$k] = $this->addHandle($mh, $v);
  •         }
  •         $this->execMulitHandle($mh);
  •         foreach($this->urls as $k => $v)
  •         {
  •             $data[$k] = curl_multi_getcontent($curl[$k]);
  •             curl_multi_remove_handle($mh, $curl[$k]);
  •         }
  •         curl_multi_close($mh);
  •         return $data;
  •     }
  •     
  •     //只抓取一个网页的内容。
  •     function execOne($url)
  •     {
  •         if (empty($url)) {
  •             return false;
  •         }
  •         $ch = curl_init($url);
  •         $this->setOneOption($ch);
  •         $content = curl_exec($ch);
  •         curl_close($ch);
  •         return $content;
  •     }
  •     
  •     //内部函数,设置某个handle 的选项
  •     private function setOneOption($ch)
  •     {
  •         curl_setopt_array($ch, $this->options);
  •     }
  •     //添加一个新的并行抓取 handle
  •     private function addHandle($mh, $url)
  •     {
  •         $ch = curl_init($url);
  •         $this->setOneOption($ch);
  •         curl_multi_add_handle($mh, $ch);
  •         return $ch;
  •     }
  •     //并行执行(这样的写法是一个常见的错误,我这里还是采用这样的写法,这个写法
  •     //下载一个小文件都可能导致cup占用100%, 并且,这个循环会运行10万次以上
  •     //这是一个典型的不懂原理产生的错误。这个错误在PHP官方的文档上都相当的常见。)
  •     private function execMulitHandle($mh)
  •     {
  •         $running = null;
  •         do {
  •             curl_multi_exec($mh, $running);
  •         } while ($running > 0);
  •     }
  • }
  • 看最后一个注释最多的函数,这个错误在平时调试的时候可能不太容易发现,因为程序完全正常,但是,在生产服务器下,马上会引起崩溃效果。

    解释为什么不能这样,必须从C 语言内部实现的角度来分析。这个部分将放到下一个教程(PHP高级编程之--单线程实现并行抓取网页 )。不过不是通过C语言来表述原理,而是通过PHP

        这个类,实际上也就很简单的实现了前面我们费了4个教程的篇幅,并且是九牛二虎之力才实现的多线程的抓取网页的功能。在纯PHP的实现下,我们只能用一个后台服务的方式来比较好的实现,但是当你使用 操作系统接口语言 C 语言时候,这个实现当然就更加的简单,灵活,高效。

        就同时抓取几个网页这样一件简单的事情,实际上在底层涉及到了很多东西,对很多半路出家的PHP程序员,可能不喜欢谈多线程这个东西,深入了就涉及到操作系统,浅点说就是并行运行好几个“程序”。但是,很多时候,多线程必不可少,比如要写个快点的爬虫,往往就会浪费九牛二虎之力。不过,PHP的程序员现在应该感谢CURL 这个扩展,这样,你完全不需要用你不太精通的 python 去写爬虫了,对于一个中型大小的爬虫,有这个内部多线程,就已经足够了。

     

    最后是上面的类的一个测试的例子:

  • $urls = array("baidu", "baidu", "baidu", "baidu", "baidu", "baidu", "www.google", "www.sina", );
  • $m = new Http_MultiRequest();
  • $t = microtime(true);
  • $m->setUrls($urls);
  • //parallel fetch(并行抓取):
  • $data = $m->exec();
  • $parallel_time = microtime(true) - $t;
  • echo $parallel_time . "\\n";
  • $t = microtime(true);
  • //serial fetch(串行抓取):
  • foreach ($urls as $url)
  • {
  •     $data[] = $m->execOne($url);
  • }
  • $serial_time = microtime(true) - $t;
  • echo $serial_time . "\\n";
  • 本文标签: 多线程PHP