
本文深入探讨了go语言并发爬虫在处理失败url重入队列时可能遇到的通道死锁问题。通过分析原始设计中所有工作协程同时阻塞在输入通道的缺陷,提出了引入独立“失败”通道的解决方案。文章提供了详细的代码示例,并解析了如何通过`select`语句高效管理任务分发与失败重试,确保爬虫稳定运行,避免因并发逻辑不当导致的程序停滞。
Go语言以其强大的并发特性和Goroutine、Channel机制,成为构建高性能并发爬虫的理想选择。然而,在设计复杂的任务调度和错误重试逻辑时,如果不慎处理通道间的交互,很容易引入死锁,导致程序意外停滞。
一个典型的Go并发爬虫结构通常包括:
在某些爬虫实现中,为了确保所有URL都能被成功处理,会设计一个重试机制:如果一个URL在处理过程中失败(例如,HTTP请求失败),它会被重新放回输入通道,等待再次处理。这种设计在理论上看似合理,但在高并发场景下,尤其是在所有工作协程同时遇到失败并尝试重入队列时,可能导致程序在运行一段时间后无故停滞。
用户反馈的现象是,爬虫在运行几分钟后(例如5-10分钟)便“卡住”,即使待处理的URL列表尚未耗尽,也无法继续工作。经过排查,并非目标网站的封禁,也不是数据库写入问题,而是程序内部的并发逻辑出现了问题。
导致这种停滞的根本原因在于通道死锁。让我们分析一下原始的worker和crawl函数片段:
func worker(input chan string, output chan SiteData) {
for url := range input { // (A) 从输入通道接收URL
resp, status := downloadURL(url)
if resp != nil && status == 200 {
output <- processSiteData(resp)
} else {
input <- url // (B) 失败时将URL重新放回输入通道
}
}
}
func crawl(urlList []string) {
numWorkers := 4
input := make(chan string)
output := make(chan SiteData)
for i := 0; i < numWorkers; i++ {
go worker(input, output)
}
go func() { // (C) 初始URL分发协程
for url := range urlList {
input <- url
}
}()
for { // (D) 结果收集协程
select {
case data := <-output:
saveToDB(data)
}
}
}死锁场景分析:
最终结果是:所有工作协程都阻塞在向input通道发送数据,而没有协程从input通道接收数据,从而形成一个典型的发送-发送死锁。程序中的所有活动协程都处于阻塞状态,导致整个程序停滞。
为了解决上述死锁问题,核心思想是将失败任务的重入逻辑与正常任务的分发逻辑解耦。我们可以引入一个独立的“失败通道” (failed chan string) 来专门收集那些需要重试的URL。
worker函数现在需要接收三个通道:input、output和failed。
func worker(input chan string, output chan SiteData, failed chan string) {
for url := range input {
resp, status := downloadURL(url)
if resp != nil && status == 200 {
output <- processSiteData(resp)
} else {
failed <- url // 将失败的URL发送到独立的failed通道
}
}
}crawl函数中的任务调度逻辑将变得更加复杂,它需要一个中心化的协程来管理URL列表,并使用select语句来非阻塞地处理新的URL分发和失败URL的重试。
func crawl(urlList []string) {
numWorkers := 4
input := make(chan string)
failed := make(chan string)
output := make(chan SiteData)
// 1. 启动工作协程
for i := 0; i < numWorkers; i++ {
go worker(input, output, failed)
}
// 2. 任务调度协程:负责分发URL和处理失败重试
go func() {
pendingURLs := urlList // 维护一个动态的待处理URL列表
for {
// 如果没有待处理的URL,则等待失败的URL或退出
if len(pendingURLs) == 0 {
select {
case url := <-failed: // 仅接收失败的URL
pendingURLs = append(pendingURLs, url)
// TODO: 添加一个退出机制,当所有任务完成时关闭通道
}
} else {
// 使用select同时尝试发送URL到input通道和接收失败URL
select {
case input <- pendingURLs[0]: // 尝试发送第一个待处理URL
pendingURLs = pendingURLs[1:] // 发送成功则移除
case url := <-failed: // 接收失败的URL并重新加入列表
pendingURLs = append(pendingURLs, url)
}
}
// 考虑添加一个退出条件,例如当pendingURLs为空且所有worker都已完成时
}
}()
// 3. 结果收集协程
for {
data := <-output
saveToDB(data)
// TODO: 添加一个退出机制,当所有任务完成时关闭通道
}
}为了使crawl函数能够优雅地退出,我们需要更精细地管理pendingURLs列表以及判断何时所有任务都已完成。以下是一个更完善的示例:
package main
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
)
// SiteData 模拟网站数据结构
type SiteData struct {
URL string
Status int
BodyLen int
// ... 其他处理后的数据
}
// downloadURL 模拟下载URL内容
func downloadURL(url string) (body []byte, status int) {
fmt.Printf("Downloading: %s\n", url)
resp, err := http.Get(url)
if err != nil {
fmt.Printf("Error downloading %s: %v\n", url, err)
return nil, 0
}
defer resp.Body.Close()
status = resp.StatusCode
if status != 200 {
fmt.Printf("Non-200 status for %s: %d\n", url, status)
return nil, status
}
body, err = ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Printf("Error reading body for %s: %v\n", url, err)
return nil, status
}
body = bytes.Trim(body, "\x00") // 移除可能的空字节
// 模拟随机失败
if url == "http://example.com/fail1" || url == "http://example.com/fail2" {
fmt.Printf("Simulating failure for %s\n", url)
return nil, 500 // 模拟失败
}
time.Sleep(50 * time.Millisecond) // 模拟下载耗时
return body, status
}
// processSiteData 模拟数据处理
func processSiteData(url string, resp []byte) SiteData {
fmt.Printf("Processing: %s (body len: %d)\n", url, len(resp))
time.Sleep(20 * time.Millisecond) // 模拟处理耗时
return SiteData{URL: url, Status: 200, BodyLen: len(resp)}
}
// saveToDB 模拟数据保存到数据库
func saveToDB(data SiteData) {
fmt.Printf("Saving to DB: %s (Status: %d, BodyLen: %d)\n", data.URL, data.Status, data.BodyLen)
time.Sleep(10 * time.Millisecond) // 模拟DB写入耗时
}
// worker 协程:从input接收URL,处理后发送到output或failed
func worker(id int, input chan string, output chan SiteData, failed chan string, wg *sync.WaitGroup) {
defer wg.Done()
for url := range input {
body, status := downloadURL(url)
if body != nil && status == 200 {
output <- processSiteData(url, body)
} else {
fmt.Printf("Worker %d: URL %s failed, re-enqueuing.\n", id, url)
failed <- url
}
}
fmt.Printf("Worker %d finished.\n", id)
}
// crawl 主调度函数
func crawl(initialURLs []string) {
numWorkers := 4
input := make(chan string)
failed := make(chan string)
output := make(chan SiteData)
done := make(chan struct{}) // 用于通知所有任务完成
var wg sync.WaitGroup // 用于等待所有worker协程完成
// 1. 启动工作协程
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i+1, input, output, failed, &wg)
}
// 2. 任务调度协程:负责分发URL和处理失败重试
go func() {
pendingURLs := make([]string, len(initialURLs))
copy(pendingURLs, initialURLs) // 复制初始URL列表
processedCount := 0
totalTasks := len(initialURLs) // 初始任务数
// 用于跟踪当前正在处理的任务数,以便判断何时所有任务完成
// 这里的逻辑需要更严谨,实际应该通过计数器追踪
// 为简化示例,假设当pendingURLs为空且没有新的失败任务时,所有任务完成
// 真正的完成判断需要考虑所有worker是否都已空闲
// 这里我们使用一个简单的计数器来模拟完成
var activeTasks int32 // 活跃任务数,包括正在处理和待处理的
// 初始化活跃任务数
for _, url := range initialURLs {
input <- url // 初始分发,这里是阻塞的,如果input无缓冲,可能需要调整
activeTasks++
}
close(input) // 初始URL分发完毕,关闭input通道,让worker知道何时停止
// 改进的调度器,使用一个单独的协程来管理URL列表
// 这样可以避免在主调度器中阻塞
go func() {
var currentURLs []string
currentURLs = append(currentURLs, initialURLs...)
// 确保所有初始URL都已发送到input
for _, url := range initialURLs {
input <- url
}
// 跟踪正在处理的URL数量
inFlight := 0
for {
select {
case url := <-failed: // 接收失败的URL
currentURLs = append(currentURLs, url)
inFlight-- // 失败任务不再in-flight
fmt.Printf("Scheduler: Received failed URL: %s, currentURLs len: %d, inFlight: %d\n", url, len(currentURLs), inFlight)
case input <- currentURLs[0]: // 尝试发送下一个URL
fmt.Printf("Scheduler: Sending URL: %s, currentURLs len: %d, inFlight: %d\n", currentURLs[0], len(currentURLs), inFlight)
currentURLs = currentURLs[1:]
inFlight++ // 成功发送,in-flight任务增加
if len(currentURLs) == 0 && inFlight == 0 {
// 所有URL都已处理完毕,且没有正在进行中的任务
close(input) // 关闭input通道,通知worker停止
return
}
}
}
}()
// 这是一个简化的调度器,更健壮的调度器需要更复杂的逻辑
// 实际应用中,需要一个机制来判断何时所有URL都已成功处理或重试次数耗尽
// 并且所有的worker都已完成。这里为了避免死锁,我们采用如下策略:
// 初始URL一次性发送,failed通道接收的URL会重新进入队列。
// 当input通道关闭后,worker会退出。
// 这里需要一个更精细的调度器,来动态管理 `input` 和 `failed`
// 让我们重写这部分,以避免死锁并允许优雅退出
go func() {
var urlsToProcess []string
urlsToProcess = append(urlsToProcess, initialURLs...)
// 用于在没有URL可发送时,等待失败URL
sendOrReceive := func() {
if len(urlsToProcess) > 0 {
select {
case input <- urlsToProcess[0]:
urlsToProcess = urlsToProcess[1:]
case url := <-failed:
urlsToProcess = append(urlsToProcess, url)
}
} else {
// 如果没有待处理URL,则只监听failed通道
// 这里是关键:防止在没有URL时阻塞在input <-
url := <-failed
urlsToProcess = append(urlsToProcess, url)
}
}
// 持续调度,直到所有任务完成
// 这里需要一个更精细的WaitGroup来跟踪所有任务的状态
// 为了避免死锁,我们暂时让这个调度器一直运行
// 直到main函数通过done通道通知其退出
// 这是一个简化的版本,实际需要一个计数器来跟踪in-flight任务
for {
select {
case <-done: // 收到退出信号
close(input) // 关闭input通道,通知worker退出
return
default:
sendOrReceive()
}
}
}()
}()
// 3. 结果收集协程
go func() {
totalResults := 0
for range output { // 接收所有结果
totalResults++
// saveToDB(data) // 已经在worker中模拟保存了,这里只是计数
}
fmt.Printf("Collected %d results.\n", totalResults)
// 当output通道关闭时,表示所有结果都已收集
close(done) // 通知调度器可以退出
}()
// 等待所有worker协程完成
wg.Wait()
close(output) // 所有worker都已退出,关闭output通道
// 等待结果收集协程和调度器协程完成
<-done
fmt.Println("Crawl finished.")
}
func main() {
urlList := []string{
"http://example.com/page1",
"http://example.com/page2",
"http://example.com/fail1", // 模拟失败
"http://example.com/page3",
"http://example.com/page4",
"http://example.com/fail2", // 模拟失败
"http://example.com/page5",
}
crawl(urlList)
}代码解析:
worker函数改动:
crawl函数改动:
结果收集协程:
这种设计确保了:
以上就是Go并发爬虫:解决通道死锁导致的停滞问题的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号