
本文详解如何利用 php fiber 实现真正的并发流读取:通过同时启动多个非阻塞流 fiber,并轮询调度,避免串行等待,从而高效获取多个远程 url 的分块内容。
本文详解如何利用 php fiber 实现真正的并发流读取:通过同时启动多个非阻塞流 fiber,并轮询调度,避免串行等待,从而高效获取多个远程 url 的分块内容。
在 PHP 8.1+ 中,Fiber 提供了轻量级协程能力,配合 stream_set_blocking(false) 可实现 I/O 多路复用式的异步读取。但需注意:Fiber 本身不提供自动调度器——若按顺序逐个 resume() 直至终止(如原代码),本质仍是同步执行;真正并发的关键在于 “并发启动 + 轮询协作”。
以下是一个完整、可运行的解决方案,支持对多个 HTTP URL 同时发起非阻塞读取,并以 100 字节为单位交错处理响应流:
<?php
function getFiberFromStream($stream, $url): Fiber {
return new Fiber(function ($stream) use ($url): void {
while (!feof($stream)) {
// 非阻塞 fread:若无数据立即返回空字符串(不会挂起)
$chunk = fread($stream, 100);
if ($chunk !== '') {
echo "reading " . strlen($chunk) . " bytes from $url\n";
Fiber::suspend($chunk);
} else {
// 模拟短暂让出控制权,避免忙等(实际中可结合 usleep 或事件循环)
Fiber::suspend('');
}
}
// feof 触发后自动退出,Fiber 终止
});
}
function getContents(array $urls): array {
$contents = [];
$fibers = []; // 存储 [Fiber, current_content, stream] 三元组
// ✅ 第一阶段:并发启动所有 Fiber
foreach ($urls as $index => $url) {
$stream = fopen($url, 'r');
if (!$stream) {
throw new RuntimeException("Failed to open stream for $url");
}
stream_set_blocking($stream, false); // 关键:设为非阻塞模式
$fiber = getFiberFromStream($stream, $url);
$initialContent = $fiber->start($stream); // 启动 Fiber,获取首个 chunk
$fibers[$index] = [$fiber, $initialContent, $stream];
}
// ✅ 第二阶段:轮询调度所有活跃 Fiber(协作式多任务)
$hasActive = true;
while ($hasActive) {
$hasActive = false;
foreach ($fibers as $index => &$item) {
[$fiber, $content, $stream] = $item;
if ($fiber->isTerminated()) {
// Fiber 已完成:保存结果并关闭流
if ($stream) {
fclose($stream);
$item[2] = null; // 防重复关闭
}
$contents[$urls[$index]] = $content;
continue;
}
// Fiber 仍在运行:继续推进一步
$hasActive = true;
try {
$nextChunk = $fiber->resume();
$item[1] = $content .= $nextChunk;
} catch (Throwable $e) {
// 处理 Fiber 内部异常(如网络中断、SSL 错误等)
error_log("Fiber error for {$urls[$index]}: " . $e->getMessage());
$item[1] = $content; // 保留已读内容
$fiber->throw($e); // 可选:向 Fiber 抛出异常
}
}
}
return $contents;
}
// 使用示例(请确保目标站点允许爬取且网络可达)
$urls = [
'https://httpbin.org/delay/1', // 模拟慢响应
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
];
// ⚠️ 注意事项:
// 1. fopen('https://...') 依赖 php-curl 或 OpenSSL 扩展,部分环境需配置 allow_url_fopen=On;
// 2. 非阻塞流在 HTTPS 场景下可能受底层 SSL 握手限制,建议生产环境搭配 ReactPHP/Swoole 等成熟事件循环;
// 3. 此方案为纯 Fiber 协作调度,无内核级异步支持,高并发时仍受限于单线程吞吐;
// 4. 实际项目中应增加超时控制(如 stream_get_meta_data($stream)['timed_out'])、重试机制与错误熔断。
var_dump(getContents($urls));该实现的核心改进在于将“启动”与“执行”解耦:先批量创建并启动所有 Fiber,再通过外层 while 循环统一调度,每次仅推进每个未终止 Fiber 一次(resume()),从而实现类似 select() 的轮询效果。输出日志将呈现理想的交错模式:
reading 123 bytes from https://httpbin.org/delay/1 reading 98 bytes from https://httpbin.org/delay/1 reading 115 bytes from https://httpbin.org/delay/1 reading 0 bytes from https://httpbin.org/delay/1 reading 67 bytes from https://httpbin.org/delay/1 ...
✅ 总结:Fiber 不是银弹,但它是构建用户态异步逻辑的有力工具。要发挥其并发价值,必须主动设计协作调度逻辑——而非依赖“自动并行”。本方案虽简洁,却清晰揭示了 Fiber 编程的本质:手动控制执行流,以换取最大灵活性。进阶场景建议集成 ext-event 或迁移到 Swoole/ReactPHP 等专业异步框架。
立即学习“PHP免费学习笔记(深入)”;











