
本文介绍一种避免为无效数据块创建冗余线程的并发优化方案——通过 threading.event 实时传递“数据结束”信号,并结合分批提交(batched submission)控制线程生成节奏,显著降低资源浪费。
本文介绍一种避免为无效数据块创建冗余线程的并发优化方案——通过 threading.event 实时传递“数据结束”信号,并结合分批提交(batched submission)控制线程生成节奏,显著降低资源浪费。
在调用分页式或块式 API 时,常见场景是:请求块号 0, 1, 2, ... 直至某次返回空(None 或空列表),即表示后续所有块均无数据。但若直接预提交全部 max_blocks 个任务(如 1000 个),而实际仅有前 39 块有效,则至少浪费 961 次线程调度与网络等待开销——这不仅拖慢整体执行,还可能触发服务端限流或本地连接池耗尽。
原代码的问题在于静态全量提交:
futures = {executor.submit(func, step) for i in range(maxBlocks)} # ❌ 一次性创建全部任务它无法感知中间响应结果,更无法及时中止后续任务提交。
✅ 正确思路是:边执行、边判断、边决策。核心依赖两个机制:
- 共享终止信号(threading.Event):任一工作线程发现 None 响应时,立即设置该事件,通知主线程停止提交新任务;
- 分批提交 + 同步等待(Batched Submission with Backpressure):不连续提交所有任务,而是按固定批次(如每 10 个块为一批)提交;每批末尾检查终止信号,若已触发则跳出循环;否则可短暂等待(event.wait(timeout))确保本批结果充分返回,再启动下一批——实现资源利用与浪费之间的可控平衡。
以下是精简可复用的实现示例:
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Event
logging.basicConfig(level=logging.INFO, format="%(levelname)-8s | %(message)s")
def fetch_block(step: int, done_event: Event) -> list | None:
"""模拟 API 请求:step 超过真实数据上限时返回 None 并标记结束"""
time.sleep(0.1) # 模拟网络延迟
if step >= 27: # 假设真实数据仅到 block-26
done_event.set()
return None
return [f"item_{step}_1", f"item_{step}_2"]
def parallel_fetch_blocks(
max_blocks: int = 1000,
batch_size: int = 10,
max_workers: int = 5,
timeout_per_batch: float = 3.0
) -> list:
done_event = Event()
futures = {} # step → Future
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for step in range(max_blocks):
if done_event.is_set():
break # 终止信号已触发,不再提交新任务
# 批次控制:每 batch_size 步暂停并确认状态
if step > 0 and step % batch_size == 0:
# 等待当前批次尽可能完成,同时响应终止信号
done_event.wait(timeout=timeout_per_batch)
futures[step] = executor.submit(fetch_block, step, done_event)
# 收集结果:取首个 None 出现前的所有非空结果
blocks_data = []
for step in sorted(futures.keys()):
result = futures[step].result()
if result is None:
break
blocks_data.extend(result)
return blocks_data
# 使用示例
if __name__ == "__main__":
data = parallel_fetch_blocks(max_blocks=100, batch_size=10)
print(f"成功获取 {len(data)} 条数据(共 {len(data)//2} 个数据块)")? 关键注意事项:
- batch_size 的权衡:值越小,浪费线程越少(最坏浪费 batch_size−1 个),但批次切换开销上升;建议从 5–20 开始压测调整;
- timeout_per_batch 不是硬性等待:event.wait(timeout) 是非阻塞检查,超时后继续执行,不会卡死;它只是给已提交任务留出合理完成窗口;
- 结果收集必须有序:因 as_completed() 无序,我们按 step 键排序遍历 futures,确保块顺序一致;
- 异常处理增强建议:生产环境应在 fetch_block 中捕获网络异常(如 requests.RequestException),并统一返回 None 或抛出特定异常后由主线程处理;
- 替代方案提示:若 API 支持「总条数预估」或「next_cursor」机制,优先采用游标式分页,比块号轮询更健壮。
该方案将线程浪费从 O(N) 降至 O(batch_size),在不确定数据边界的大规模 API 抓取场景中,兼具效率、可控性与工程鲁棒性。









