一个php并发多任务执行的例子。
时间:2010-07-19 来源:beingchou
欢迎拍砖。
<?php
/**
* 多任务并发执行演示.
* @author [email protected]
* 请在linux的shell里面执行本程序,珍爱生命,远离windows.
*/
//最大并发数,用来保护服务器不被累死
define('MAX_CONCURRENCY', 10);
/**
* 多任务并发类
*/
class MultiTask
{
/**
* 任务池
* @var array $pool
*/
private $pool = array();
/**
* 并发数
* @var int $concurrency
*/
private $concurrency = 1;
/**
* 设置并发数
* @param int $concurrency
*/
public function setConcurrency($concurrency = 1)
{
if($concurrency>=1 && $concurrency<=MAX_CONCURRENCY)
{
$this->concurrency = $concurrency;
}
}
/**
* 添加一个新任务. 如果任务池满了,就先消化一个任务池内的任务.
* @param Task $new_task
* @return bool
*/
public function addTask(Task $new_task)
{
$pool = &$this->pool;
$status = $new_task->status();
if($this->isFull())
{
$this->doWork();
}
$pool[] = $new_task;
echo "new task {$status['command']}\r\n";
return true;
}
/**
* 消化一个任务池内的任务,本方法是本程序的核心所在
* @return bool
*/
private function doWork()
{
$pool = &$this->pool;
while($this->notEmpty())
{
foreach($pool as $tid=>$task)
{
$status = $task->status();
if($task->isRunning())
{
if($task->isTimeout())
{
echo "task:$tid:{$status['command']} timeout, force closed!\r\n";
$task->terminate();
unset($pool[$tid]);
return false;
}
echo "checking task:$tid:{$status['command']}:".intval($status['excute_time'])."\r\n";
}
else
{
echo "task:$tid:{$status['command']} finished!\r\n";
$task->close();
unset($pool[$tid]);
return true;
}
}
sleep(1);
}
return true;
}
/**
* 判断任务池满了
* @return bool
*/
private function isFull()
{
return count($this->pool) >= $this->concurrency;
}
/**
* 判断任务池非空
* @return bool
*/
private function notEmpty()
{
return !empty($this->pool);
}
/**
* 全部任务添加后的完成阶段
*/
public function finish()
{
while($this->notEmpty())
{
$this->doWork();
sleep(1);
}
echo "All tasks have been done!\r\n";
}
}
/*
* 任务类
*/
class Task
{
/**
* 任务句柄
* @var resource $handle
*/
private $handle;
/**
* 任务开始时间
* @var time $start_time
*/
private $start_time;
/**
* 命令管道,包括 0=>输入 1=>输出 2=>错误
* @var array $pipes
*/
private $pipes = array();
/**
* 任务状态
* @var array $status
*/
private $status = array();
/**
* 指定任务运行所需的环境变量
* @var array $env
*/
private $env = array();
/**
* 指定任务运行所需的当前路径
* @var string $cwd
*/
private $cwd = '';
/**
* 超时时间,单位为秒,默认为0,值为0时,永不超时.
* @var int $timeout
*/
private $timeout = 0;
/**
* 错误日志的路径
* @var string $error_log
*/
private $error_log = './error.log';
/**
* 用构造函数来调用命令
* @param string $cmd
* @param array $param
*/
public function __construct($cmd, $param=array())
{
$this->cwd = getcwd();
//只接受合法的参数
foreach(array(
'env',
'cwd',
'timeout',
'error_log',
) as $validparam)
{
if(isset($param[$validparam]))
{
$this->$validparam = $param[$validparam];
}
}
$desc = array(
0=>array('pipe','r'),
1=>array('pipe','w'),
2=>array('file', $this->error_log, 'a')
);
//执行命令
$this->handle = proc_open($cmd, $desc, $this->pipes, $this->cwd, $this->env);
//把输出设成非阻塞
stream_set_blocking($this->pipes[1], 0);
$this->start_time = microtime(true);
}
/**
* 任务超时的判断,调用本方法前应先调用 $task->status()方法
* @return bool
*/
public function isTimeout()
{
return $this->timeout
? $this->status['excute_time'] >= $this->timeout
: false;
}
/**
* 判断任务是否在执行,调用本方法前应先调用 $task->status()方法
* @return bool
*/
public function isRunning()
{
return $this->status['running'];
}
/**
* 正常结束任务
*/
public function close()
{
if(is_resource($this->handle))
{
proc_close($this->handle);
}
}
/**
* 强行终止超时的任务
*/
public function terminate()
{
if(is_resource($this->handle))
{
proc_terminate($this->handle);
proc_close($this->handle);
}
}
/**
* 获取任务状态
* @return array
*/
public function status()
{
$status = & $this->status;
//获取进程句柄的状态
$status = proc_get_status($this->handle);
$status['start_time'] = $this->start_time;
$status['excute_time'] = microtime(true) - $this->start_time;
return $status;
}
}
$m = new MultiTask();
$m->setConcurrency(10);
//执行超时的例子
$cmd = "sleep 1000";
$m->addTask(new Task($cmd, array('timeout'=>10)));
for($i=0;$i<20;$i++)
{
//正常执行的例子
$cmd = "sleep ".rand(1,20);
$m->addTask(new Task($cmd));
}
//任务结束,别忘了
$m->finish();
?>
<?php
/**
* 多任务并发执行演示.
* @author [email protected]
* 请在linux的shell里面执行本程序,珍爱生命,远离windows.
*/
//最大并发数,用来保护服务器不被累死
define('MAX_CONCURRENCY', 10);
/**
* 多任务并发类
*/
class MultiTask
{
/**
* 任务池
* @var array $pool
*/
private $pool = array();
/**
* 并发数
* @var int $concurrency
*/
private $concurrency = 1;
/**
* 设置并发数
* @param int $concurrency
*/
public function setConcurrency($concurrency = 1)
{
if($concurrency>=1 && $concurrency<=MAX_CONCURRENCY)
{
$this->concurrency = $concurrency;
}
}
/**
* 添加一个新任务. 如果任务池满了,就先消化一个任务池内的任务.
* @param Task $new_task
* @return bool
*/
public function addTask(Task $new_task)
{
$pool = &$this->pool;
$status = $new_task->status();
if($this->isFull())
{
$this->doWork();
}
$pool[] = $new_task;
echo "new task {$status['command']}\r\n";
return true;
}
/**
* 消化一个任务池内的任务,本方法是本程序的核心所在
* @return bool
*/
private function doWork()
{
$pool = &$this->pool;
while($this->notEmpty())
{
foreach($pool as $tid=>$task)
{
$status = $task->status();
if($task->isRunning())
{
if($task->isTimeout())
{
echo "task:$tid:{$status['command']} timeout, force closed!\r\n";
$task->terminate();
unset($pool[$tid]);
return false;
}
echo "checking task:$tid:{$status['command']}:".intval($status['excute_time'])."\r\n";
}
else
{
echo "task:$tid:{$status['command']} finished!\r\n";
$task->close();
unset($pool[$tid]);
return true;
}
}
sleep(1);
}
return true;
}
/**
* 判断任务池满了
* @return bool
*/
private function isFull()
{
return count($this->pool) >= $this->concurrency;
}
/**
* 判断任务池非空
* @return bool
*/
private function notEmpty()
{
return !empty($this->pool);
}
/**
* 全部任务添加后的完成阶段
*/
public function finish()
{
while($this->notEmpty())
{
$this->doWork();
sleep(1);
}
echo "All tasks have been done!\r\n";
}
}
/*
* 任务类
*/
class Task
{
/**
* 任务句柄
* @var resource $handle
*/
private $handle;
/**
* 任务开始时间
* @var time $start_time
*/
private $start_time;
/**
* 命令管道,包括 0=>输入 1=>输出 2=>错误
* @var array $pipes
*/
private $pipes = array();
/**
* 任务状态
* @var array $status
*/
private $status = array();
/**
* 指定任务运行所需的环境变量
* @var array $env
*/
private $env = array();
/**
* 指定任务运行所需的当前路径
* @var string $cwd
*/
private $cwd = '';
/**
* 超时时间,单位为秒,默认为0,值为0时,永不超时.
* @var int $timeout
*/
private $timeout = 0;
/**
* 错误日志的路径
* @var string $error_log
*/
private $error_log = './error.log';
/**
* 用构造函数来调用命令
* @param string $cmd
* @param array $param
*/
public function __construct($cmd, $param=array())
{
$this->cwd = getcwd();
//只接受合法的参数
foreach(array(
'env',
'cwd',
'timeout',
'error_log',
) as $validparam)
{
if(isset($param[$validparam]))
{
$this->$validparam = $param[$validparam];
}
}
$desc = array(
0=>array('pipe','r'),
1=>array('pipe','w'),
2=>array('file', $this->error_log, 'a')
);
//执行命令
$this->handle = proc_open($cmd, $desc, $this->pipes, $this->cwd, $this->env);
//把输出设成非阻塞
stream_set_blocking($this->pipes[1], 0);
$this->start_time = microtime(true);
}
/**
* 任务超时的判断,调用本方法前应先调用 $task->status()方法
* @return bool
*/
public function isTimeout()
{
return $this->timeout
? $this->status['excute_time'] >= $this->timeout
: false;
}
/**
* 判断任务是否在执行,调用本方法前应先调用 $task->status()方法
* @return bool
*/
public function isRunning()
{
return $this->status['running'];
}
/**
* 正常结束任务
*/
public function close()
{
if(is_resource($this->handle))
{
proc_close($this->handle);
}
}
/**
* 强行终止超时的任务
*/
public function terminate()
{
if(is_resource($this->handle))
{
proc_terminate($this->handle);
proc_close($this->handle);
}
}
/**
* 获取任务状态
* @return array
*/
public function status()
{
$status = & $this->status;
//获取进程句柄的状态
$status = proc_get_status($this->handle);
$status['start_time'] = $this->start_time;
$status['excute_time'] = microtime(true) - $this->start_time;
return $status;
}
}
$m = new MultiTask();
$m->setConcurrency(10);
//执行超时的例子
$cmd = "sleep 1000";
$m->addTask(new Task($cmd, array('timeout'=>10)));
for($i=0;$i<20;$i++)
{
//正常执行的例子
$cmd = "sleep ".rand(1,20);
$m->addTask(new Task($cmd));
}
//任务结束,别忘了
$m->finish();
?>
相关阅读 更多 +