
本文详解如何利用 PHP 8.1+ 的 Fiber 特性,结合非阻塞流(stream_set_blocking(false)),实现对多个 URL 的并发、交错式 fread 读取,避免串行等待,真正达成协程级 I/O 多路复用。
本文详解如何利用 PHP 8.1+ 的 Fiber 特性,结合非阻塞流(stream_set_blocking(false)),实现对多个 URL 的并发、交错式 fread 读取,避免串行等待,真正达成协程级 I/O 多路复用。
在 PHP 中,Fiber 提供了轻量级协程能力,但其本身不自动调度——它不会像 Go 的 goroutine 或 Node.js 的事件循环那样隐式轮转多个 Fiber。若按顺序启动并耗尽每个 Fiber(如原代码中 while (!$fiber-youjiankuohaophpcnisTerminated()) { $fiber->resume(); }),实际仍是同步串行执行,无法体现并发优势。
要实现真正的“交错读取”(即每读取一个 URL 的 100 字节后,立即切换到下一个 URL),关键在于:先批量启动所有 Fiber,再通过轮询(round-robin)方式交替驱动它们,每次仅推进一步,直到全部完成。
以下是优化后的完整实现:
立即学习“PHP免费学习笔记(深入)”;
<?php
function getFiberFromStream($stream, $url): Fiber {
return new Fiber(function ($stream) use ($url): void {
while (!feof($stream)) {
// 非阻塞 fread:若无数据可读,立即返回 ''(而非阻塞等待)
$chunk = fread($stream, 100);
if ($chunk === false || $chunk === '') {
// 模拟短暂让出控制权,避免忙等;生产环境建议结合 stream_select()
usleep(1000);
continue;
}
echo "reading " . strlen($chunk) . " bytes from $url" . PHP_EOL;
Fiber::suspend($chunk);
}
// 显式关闭流前确保读完(可选)
fclose($stream);
});
}
function getContents(array $urls): array {
$contents = [];
$fibers = []; // 存储 [Fiber, currentContent, stream] 元组
// ✅ 第一阶段:批量初始化并启动所有 Fiber
foreach ($urls as $key => $url) {
$stream = fopen($url, 'r');
if (!$stream) {
throw new RuntimeException("Failed to open stream for $url");
}
stream_set_blocking($stream, false); // 关键:设为非阻塞模式
$fiber = getFiberFromStream($stream, $url);
$initialContent = $fiber->start($stream); // 启动 Fiber,获取首块内容
$fibers[$key] = [$fiber, $initialContent, $stream];
}
// ✅ 第二阶段:轮询调度 —— 真正实现并发交错执行
$hasActive = true;
while ($hasActive) {
$hasActive = false;
foreach ($fibers as $key => &$item) {
[$fiber, $content, $stream] = $item;
if (!$fiber->isTerminated()) {
$hasActive = true;
try {
$nextChunk = $fiber->resume();
$item[1] = $content .= $nextChunk; // 更新当前累积内容
} catch (Throwable $e) {
// Fiber 内部异常处理(如网络中断、SSL 错误)
error_log("Fiber for {$urls[$key]} failed: " . $e->getMessage());
$item[1] = $content; // 保留已读内容
$fiber->throw($e); // 可选:传播异常
}
} else {
// Fiber 已终止:保存结果并安全关闭流
if ($stream && is_resource($stream)) {
// 注意:Fiber 内部已 fclose,此处双重检查更安全
@fclose($stream);
$item[2] = null; // 标记流已关闭
}
$contents[$urls[$key]] = $content;
unset($fibers[$key]); // 清理已完成项(可选,提升后续轮询效率)
}
}
// 小休眠防 CPU 空转(仅用于演示;生产环境应替换为 stream_select)
if ($hasActive) {
usleep(500);
}
}
return $contents;
}
// 使用示例
$urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
];
try {
$results = getContents($urls);
echo "\n✅ All downloads completed.\n";
foreach ($urls as $url) {
echo "Length of {$url}: " . strlen($results[$url]) . " bytes\n";
}
} catch (Exception $e) {
echo "Error: " . $e->getMessage() . "\n";
}⚠️ 重要注意事项
- 非阻塞流是前提:必须调用 stream_set_blocking($stream, false),否则 fread() 会阻塞整个 Fiber,失去并发意义。
- fread() 在非阻塞模式下的行为:若缓冲区暂无数据,fread() 返回空字符串 ''(不是 false),需主动跳过或重试,不可直接拼接。
- 轮询开销问题:上述 usleep() 是简化方案。生产环境强烈推荐改用 stream_select() 对多个流进行就绪态监听,实现零忙等、高效率的 I/O 多路复用。Fiber 调度器可封装为基于 stream_select 的事件循环。
- 错误与超时处理:真实场景需为流设置超时(stream_context_set_option($context, 'http', 'timeout', 10))、捕获 fopen/fread 异常,并添加重试逻辑。
- 内存管理:大文件下载时,避免将全部内容累积在内存中;可改为边读边写入临时文件或回调处理。
✅ 总结
Fiber 本身不提供自动调度,但赋予了手动构建协程调度器的能力。通过「批量启动 + 轮询推进」模式,配合非阻塞流,即可在 PHP 中实现简洁、可控的并发 I/O。这为构建高性能 CLI 工具、爬虫、微服务聚合层等场景提供了原生、低开销的协程基础。下一步进阶方向是集成 stream_select 或使用 Swoole/ReactPHP 等成熟异步框架进一步提升可扩展性。











