0

0

php如何使用命令行实现异步多进程模式的任务处理(代码)

不言

不言

发布时间:2019-01-23 10:20:28

|

3106人浏览过

|

来源于segmentfault

转载

本篇文章给大家带来的内容是关于php如何使用命令行实现异步多进程模式的任务处理(代码),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

用PHP来实现异步任务一直是个难题,现有的解决方案中:PHP知名的异步框架有 swoole 和 Workerman,但都是无法在 web 环境中直接使用的,即便强行搭建 web 环境,异步调用也是使用多进程模式实现的。但有时真的不需要用启动服务的方式,让服务端一直等待客户端消息,何况中间还不能改动服务端代码。本文就介绍一下不使用任何框架和第三方库的情况下,在 CLI 环境中如何实现多进程以及在web环境中的异步调用。

在 web 环境的异步调用

常用的方式有两种

1. 使用 socket 连接

立即学习PHP免费学习笔记(深入)”;

这种方式就是典型的C/S架构,需要有服务端支持。

// 1. 创建socket套接字
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
// 2. 进行socket连接
socket_connect($socket, '127.0.0.1', '3939');
//socket_set_nonblock($socket); // 以非阻塞模式运行,由于在客户端不实用,所以这里不考虑
// 3. 向服务端发送请求
socket_write($socket, $request, strlen($request));
// 4. 接受服务端的回应消息(忽略非阻塞的情况,如果服务端不是提供异步服务,那这一步可以省略)
$recv = socket_read($socket, 2048);
// 5. 关闭socket连接
socket_close($socket);

2. 使用 popen 打开进程管道

这种方式是使用操作系统命令,由操作系统直接执行。

本文讨论的异步调用就是使用这种方式。

$sf = '/path/to/cli_async_task.php'; //要执行的脚本文件
$op = 'call'; //脚本文件接收的参数1
$data = base64_encode(serialize(['TestTask', 'arg1', 'arg2'])); //脚本文件接收的参数2
pclose(popen("php '$sf' --op $op --data $data &", 'r')); //打开之后接着就关闭进程管道,让该进程以守护模式运行
echo PHP_EOL.'异步任务已执行。'.PHP_EOL;

这种方式的优点就是:一步解决,当前进程不需要任何开销。
缺点也很明显:无法跟踪任务脚本的运行状态。
所以重头戏会是在执行任务的脚本文件上,下面就介绍任务处理和多进程的实现方式。

CLI 环境的多进程任务处理

注意:多进程模式仅支持Linux,不支持Windows!!

这里会从0开始(未使用任何框架和类库)介绍每一个步骤,最后会附带一份完整的代码

1. 创建脚本

  • 任何脚本不可忽视的地方就是错误处理。所以写一个任务处理脚本首先就是写错误处理方式。

在PHP中就是调用 set_exception_handler set_error_handler register_shutdown_function 这三个函数,然后写上自定义的处理方法。

  • 接着是定义自动加载函数 spl_autoload_register 免去每使用一个新类都要 require / include 的烦恼。

  • 定义日志操作方法。

  • 定义任务处理方法。

  • 读取来自命令行的参数,开始执行任务。

    Sesame AI
    Sesame AI

    一款开创性的语音AI伴侣,具备先进的自然对话能力和独特个性。

    下载

2. 多进程处理

PHP 创建多进程是使用 pcntl_fork 函数,该函数会 fork 一份当前进程(影分身术),于是就有了两个进程,当前进程是主进程(本体),fork 出的进程是子进程(影分身)。需要注意的是两个进程代码环境是一样的,两个进程都是执行到了 pcntl_fork 函数位置。区别就是 getmypid 获得的进程号不一样,最重要的区分是当调用 pcntl_fork函数时,子进程获得的返回值是 0,而主进程获得的是子进程的进程号 pid。

好了,当我们知道谁是子进程后,就可以让该子进程执行任务了。

那么主进程是如何得知子进程的状态呢?
使用 pcntl_wait。该函数有两个参数 $status 和 $options ,$status 是引用类型,用来存储子进程的状态,$options 有两个可选常量WNOHANG| WUNTRACED,分别表示不等待子进程结束立即返回和等待子进程结束。很明显使用WUNTRACED会阻塞主进程。(也可以使用 pcntl_waitpid 函数获取特定 pid 子进程状态)

在多进程中,主进程要做的就是管理每个子进程的状态,否则子进程很可能无法退出而变成僵尸进程。

关于多进程间的消息通信
这一块需要涉及具体的业务逻辑,所以只能简单的提一下。不考虑使用第三方比如 redis 等服务的情况下,PHP原生可以实现就是管道通信共享内存等方式。实现起来都比较简单,缺点就是可使用的数据容量有限,只能用简单文本协议交换数据。

如何手动结束所有进程任务

如果多进程处理不当,很可能导致进程任务卡死,甚至占用过多系统资源,此时只能手动结束进程。
除了一个个的根据进程号来结束,还有一个快速的方法是首先在任务脚本里自定义进程名称,就是调用cli_set_process_title函数,然后在命令行输入:ps aux|grep cli_async_worker |grep -v grep|awk '{print $2}'|xargs kill -9 (里面的 cli_async_worker 就是自定义的进程名称),这样就可以快速结束多进程任务了。

以下是完整的任务执行脚本代码:

可能无法直接使用,需要修改的地方有:

  1. 脚本目录和日志目录常量

  2. 自动加载任务类的方法(默认是加载脚本目录中以Task结尾的文件)

  3. 其他的如:错误和日志处理方式和文本格式就随意吧...

  4. 如果命名管道文件设置有错误,可能导致进程假死,你可能需要手动删除进程管道通信的代码。

  5. 多进程的例子:execAsyncTask('multi', [ 'test' => ['a', 'b', 'c'], 'grab' => [['url' => 'https://www.baidu.com', 'callback' => 'http://localhost']] ]);。执行情况可以在日志文件中查看。execAsyncTask函数参考【__使用popen打开进程管道__】。

<?php

error_reporting(E_ALL ^ E_NOTICE ^ E_USER_WARNING);
@ini_set('display_errors', 0);
@ini_set('date.timezone', 'PRC');

chdir(__DIR__);

/* 任务脚本目录 */
defined('TASK_PATH') or define('TASK_PATH', realpath(__DIR__ .'/tasks'));
/* 任务日志目录 */
defined('TASK_LOGS_PATH') or define('TASK_LOGS_PATH', __DIR__ .'/tasks/logs');

if (!is_dir(TASK_LOGS_PATH)) @mkdir(TASK_LOGS_PATH, 0777, true);

set_exception_handler(function($e) {
    $time = date('H:i:s', time());
    $msg = sprintf(''. '<h3>[%s] %s (%s)</h3>'. "\n". '<pre class="brush:php;toolbar:false;">%s
',         $time, $e->getMessage(), $e->getCode(), $e->getTraceAsString()     );     file_put_contents(TASK_LOGS_PATH .'/exception-'.date('Ymd').'.log', $msg.PHP_EOL, FILE_APPEND|LOCK_EX); }); set_error_handler(function($errno, $errmsg, $filename, $line) {     if (!(error_reporting() & $errno)) return;     ob_start();     debug_print_backtrace();     $backtrace = ob_get_contents(); ob_end_clean();     $datetime = date('Y-m-d H:i:s', time());     $msg =  $header) {             if (!is_numeric($_k))                  $header = sprintf('%s: %s', $_k, $header);             $_headers .= $header . "\r\n";         }     }     $headers = "Connection: close\r\n" . $_headers;     $opts = array(         'http' => array(             'method' => strtoupper(@$job['method'] ?: 'get'),             'content' => @$job['data'] ?: null,             'header' => $headers,             'user_agent' => @$job['args']['user_agent'] ?: 'HTTPGRAB/1.0 (compatible)',             'proxy' => @$job['args']['proxy'] ?: null,             'timeout' => intval(@$job['args']['timeout'] ?: 120),             'protocol_version' => @$job['args']['protocol_version'] ?: '1.1',             'max_redirects' => 3,             'ignore_errors' => true         )     );     $ret = @file_get_contents($url, false, stream_context_create($opts));     //debug_log($url.' -->'.strlen($ret));     if ($ret and isset($job['callback'])) {         $postdata = http_build_query(array(                 'msg_id' => @$job['msg_id'] ?: 0,                 'url' => @$job['url'],                 'result' => $ret             ));         $opts = array(             'http' => array(                 'method' => 'POST',                 'header' => 'Content-type:application/x-www-form-urlencoded'. "\r\n",                 'content' => $postdata,                 'timeout' => 30             )         );         file_get_contents($job['callback'], false, stream_context_create($opts));         //debug_log(json_encode(@$http_response_header));         //debug_log($job['callback'].' -->'.$ret2);     }          return $ret; } function clean($tmpdirs, $expires=3600*24*7) {     $ret = [];     foreach ((array)$tmpdirs as $tmpdir) {         $ret[$tmpdir] = 0;         foreach (glob($tmpdir.DIRECTORY_SEPARATOR.'*') as $_file) {             if (fileatime($_file) open($file, \ZipArchive::CREATE)) {         return false;     }     _backup_dir($zip, $dest);          $zip->close();     return $file; } function _backup_dir($zip, $dest, $sub='') {     $dest = rtrim($dest, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;     $sub = rtrim($sub, DIRECTORY_SEPARATOR) . DIRECTORY_SEPARATOR;     $dir = opendir($dest);     if (!$dir) return false;     while (false !== ($file = readdir($dir))) {         if (is_file($dest . $file)) {             $zip->addFile($dest . $file, $sub . $file);         } else {             if ($file != '.' and $file != '..' and is_dir($dest . $file)) {                 //$zip->addEmptyDir($sub . $file . DIRECTORY_SEPARATOR);                 _backup_dir($zip, $dest . $file, $file);             }         }     }     closedir($dir);     return true; } function execute_task($op, $data) {     debug_log('Start...');     $t1 = microtime(true);     switch($op) {     case 'call': //执行任务脚本类         $cmd = $data;         if (is_string($cmd) and class_exists($cmd)) $cmd = new $cmd;         elseif (is_array($cmd)) {             if (is_string($cmd[0]) and class_exists($cmd[0])) $cmd[0] = new $cmd[0];         }         $ret = call($cmd);         break;     case 'grab': //抓取网页         if (is_string($data)) $data = ['url' => $data];         if (is_array($data)) $ret = grab($data);         else throw new \Exception('无效的命令参数!');         break;     case 'clean': //清理缓存文件夹:dirs 需要清理的文件夹列表,expires 过期时间(秒,默认7天)         if (isset($data['dirs'])) {             $ret = clean($data['dirs'], @$data['expires']);         } else {             $ret = clean($data);         }         break;     case 'backup': //备份文件:zip 备份到哪个zip文件,dest 需要备份的文件夹         if (isset($data['zip']) and is_dir($data['dest']))             $ret = backup($data['zip'], $data['dest']);         else             throw new \Exception('没有指定需要备份的文件!');         break;     case 'require': //加载脚本文件         if (is_file($data)) $ret = require($data);         else throw new \Exception('不是可请求的文件!');         break;     case 'test':         sleep(rand(1, 5));         $ret = ucfirst(strval($data)). '.PID:'. getmypid();         break;     case 'multi': //多进程处理模式         $results = $childs = [];         $fifo = TASK_LOGS_PATH . DIRECTORY_SEPARATOR . 'pipe.'. posix_getpid();         if (!file_exists($fifo)) {             if (!posix_mkfifo($fifo, 0666)) { //开启进程数据通信管道                 throw new Exception('make pipe failed!');             }         }         //$shmid = shmop_open(ftok(__FILE__, 'h'), 'c', 0644, 4096); //共享内存         //shmop_write($shmid, serialize([]), 0);         //$data = unserialize(shmop_read($shmid, 0, 4096));         //shmop_delete($shmid);         //shmop_close($shmid);         foreach($data as $_op => $_datas) {             $_datas = (array)$_datas; //data 格式为数组表示一个 op 有多个执行数据             foreach($_datas as $_data) {                 $pid = pcntl_fork();                 if ($pid == 0) { //子进程中执行任务                     $_ret = execute_task($_op, $_data);                     $_pid = getmypid();                     $pipe = fopen($fifo, 'w'); //写                     //stream_set_blocking($pipe, false);                     $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => $_ret]);                     if (strlen($_ret) > 4096) //写入管道的数据最大4K                         $_ret = serialize(['pid' => $_pid, 'op' => $_op, 'args' => $_data, 'result' => '[RESPONSE_TOO_LONG]']);                     //debug_log('write pipe: '.$_ret);                     fwrite($pipe, $_ret.PHP_EOL);                     fflush($pipe);                     fclose($pipe);                     exit(0); //退出子进程                 } elseif ($pid > 0) { //主进程中记录任务                     $childs[] = $pid;                     $results[$pid] = 0;                     debug_log('fork by child: '.$pid);                     //pcntl_wait($status, WNOHANG);                 } elseif ($pid == -1) {                     throw new Exception('could not fork at '. getmygid());                 }             }         }         $pipe = fopen($fifo, 'r+'); //读         stream_set_blocking($pipe, true); //阻塞模式,PID与读取的管道数据可能会不一致。         $n = 0;         while(count($childs) > 0) {             foreach($childs as $i => $pid) {                 $res = pcntl_waitpid($pid, $status, WNOHANG);                 if (-1 == $res || $res > 0) {                     $_ret = @unserialize(fgets($pipe)); //读取管道数据                     $results[$pid] = $_ret;                     unset($childs[$i]);                     debug_log('read child: '.$pid . ' - ' . json_encode($_ret, 64|256));                 }                 if ($n > 1000) posix_kill($pid, SIGTERM); //超时(10分钟)结束子进程             }             usleep(200000); $n++;         }         debug_log('child process completed.');         @fclose($pipe);         @unlink($fifo);         $ret = json_encode($results, 64|256);         break;     default:         throw new \Exception('没有可执行的任务!');         break;     }     $t2 = microtime(true);     $times = round(($t2 - $t1) * 1000, 2);     $log = sprintf('[%s] %s --> (%s) %sms', strtoupper($op),          @json_encode($data, 64|256), @strlen($ret)


相关文章

PHP速学教程(入门到精通)
PHP速学教程(入门到精通)

PHP怎么学习?PHP怎么入门?PHP在哪学?PHP怎么学才快?不用担心,这里为大家提供了PHP速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
swoole为什么能常驻内存
swoole为什么能常驻内存

swoole常驻内存的特性:1. 事件驱动模型减少内存消耗;2. 协程并行执行任务占用更少内存;3. 协程池预分配协程消除创建开销;4. 静态变量保留状态减少内存分配;5. 共享内存跨协程共享数据降低内存开销。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

306

2024.04.10

python中print函数的用法
python中print函数的用法

python中print函数的语法是“print(value1, value2, ..., sep=' ', end=' ', file=sys.stdout, flush=False)”。本专题为大家提供print相关的文章、下载、课程内容,供大家免费下载体验。

192

2023.09.27

python print用法与作用
python print用法与作用

本专题整合了python print的用法、作用、函数功能相关内容,阅读专题下面的文章了解更多详细教程。

18

2026.02.03

java基础知识汇总
java基础知识汇总

java基础知识有Java的历史和特点、Java的开发环境、Java的基本数据类型、变量和常量、运算符和表达式、控制语句、数组和字符串等等知识点。想要知道更多关于java基础知识的朋友,请阅读本专题下面的的有关文章,欢迎大家来php中文网学习。

1566

2023.10.24

require的用法
require的用法

require的用法有引入模块、导入类或方法、执行特定任务。想了解更多require的相关内容,可以阅读本专题下面的文章。

509

2023.11.27

windows查看端口占用情况
windows查看端口占用情况

Windows端口可以认为是计算机与外界通讯交流的出入口。逻辑意义上的端口一般是指TCP/IP协议中的端口,端口号的范围从0到65535,比如用于浏览网页服务的80端口,用于FTP服务的21端口等等。怎么查看windows端口占用情况呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

1496

2023.07.26

查看端口占用情况windows
查看端口占用情况windows

端口占用是指与端口关联的软件占用端口而使得其他应用程序无法使用这些端口,端口占用问题是计算机系统编程领域的一个常见问题,端口占用的根本原因可能是操作系统的一些错误,服务器也可能会出现端口占用问题。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

1170

2023.07.27

windows照片无法显示
windows照片无法显示

当我们尝试打开一张图片时,可能会出现一个错误提示,提示说"Windows照片查看器无法显示此图片,因为计算机上的可用内存不足",本专题为大家提供windows照片无法显示相关的文章,帮助大家解决该问题。

835

2023.08.01

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

3

2026.03.11

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
PHP课程
PHP课程

共137课时 | 13.3万人学习

JavaScript ES5基础线上课程教学
JavaScript ES5基础线上课程教学

共6课时 | 11.3万人学习

PHP新手语法线上课程教学
PHP新手语法线上课程教学

共13课时 | 1.0万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号