0

0

运行异步TCP服务器与FastAPI:统一事件循环下的应用集成

心靈之曲

心靈之曲

发布时间:2025-10-22 10:03:30

|

805人浏览过

|

来源于php中文网

原创

运行异步TCP服务器与FastAPI:统一事件循环下的应用集成

本文详细阐述了如何在fastapi应用中,利用其`lifespan`事件管理器,高效且优雅地集成多个异步tcp服务器。通过正确使用`asyncio.create_task`在应用启动时启动后台服务,并在应用关闭时实现这些服务的平滑终止,确保fastapi与自定义tcp服务在同一个事件循环中协同工作,实现数据从tcp到websocket的无缝转发。

1. 引言:FastAPI与异步服务的融合

在构建现代异步应用时,我们常常需要将Web服务(如基于FastAPI)与自定义的后台服务(如TCP服务器)结合起来。FastAPI以其高性能和异步特性而闻名,而Python的asyncio库则为构建并发网络应用提供了强大的支持。本文将探讨如何在同一个Python进程和事件循环中,无缝地运行一个FastAPI应用和多个异步TCP服务器,并实现数据在它们之间的流转,例如将TCP接收到的数据通过WebSocket广播给客户端。

2. 理解FastAPI的Lifespan事件管理器

FastAPI提供了lifespan事件管理器,这是一个基于contextlib.asynccontextmanager的强大工具,用于在应用程序启动和关闭时执行异步操作。其核心在于yield关键字:

  • yield之前的部分:在应用程序启动时执行。这里适合进行资源初始化、数据库连接、启动后台任务等操作。
  • yield之后的部分:在应用程序关闭时执行。这里适合进行资源清理、关闭连接、停止后台任务等操作。

常见误区:将需要持续运行的后台任务的启动逻辑放置在yield之后。这样做会导致任务仅在应用程序关闭时才尝试启动,而非在应用程序运行期间。

3. 正确集成异步TCP服务器的策略

为了让TCP服务器与FastAPI应用同时运行,并共享同一个事件循环,我们需要遵循以下策略:

  1. 使用 asyncio.create_task() 启动后台任务:将TCP服务器的启动协程包装成一个asyncio.Task。这会立即调度协程在事件循环中运行,而不会阻塞lifespan的启动流程。
  2. 在 yield 之前启动任务:确保所有需要随应用生命周期运行的后台任务都在lifespan的yield语句之前被创建并启动。
  3. 在 yield 之后实现优雅关闭:当应用收到关闭信号时(例如Ctrl+C或进程终止),lifespan的yield之后的部分会被执行。此时,我们应该取消之前启动的后台任务,并等待它们完成清理工作,以确保资源被正确释放。

4. 完整的代码示例

以下是根据上述策略修改后的代码,包括server.py, globals.py, websocket_manager.py 和 main.py。

websocket_manager.py (WebSocket连接管理)

# websocket_manager.py
from fastapi import WebSocket
from typing import List

class WebSocketManager:
    """
    管理活跃的WebSocket连接,并提供广播功能。
    """
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        """建立WebSocket连接并将其添加到活跃连接列表。"""
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        """从活跃连接列表中移除断开的WebSocket连接。"""
        if websocket in self.active_connections:
            self.active_connections.remove(websocket)

    async def broadcast(self, data: str):
        """向所有活跃的WebSocket连接广播数据。"""
        # 遍历时创建一个副本以避免在迭代过程中修改列表
        for connection in list(self.active_connections):
            try:
                await connection.send_text(data)
            except Exception as e:
                print(f"Error broadcasting to WebSocket: {e}. Disconnecting...")
                self.disconnect(connection) # 广播失败则断开连接

globals.py (全局变量)

# globals.py
import threading
from websocket_manager import WebSocketManager

# 示例:全局数据存储和锁(当前示例中未使用,但保留结构)
data_storage = {}
data_lock = threading.Lock() # 注意:在asyncio环境中,通常应使用asyncio.Lock

# WebSocket管理器实例,供其他模块访问
websocket_manager = WebSocketManager()

server.py (异步TCP服务器)

# server.py
import asyncio
import globals

async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    """
    处理单个TCP客户端连接。
    从客户端读取数据,并通过WebSocketManager广播。
    """
    peername = writer.get_extra_info('peername')
    print(f"TCP client connected from {peername}")
    try:
        while True:
            data = await reader.read(1024) # 读取最多1024字节
            if not data:
                print(f"TCP client {peername} disconnected.")
                break
            # 将接收到的原始数据解码为UTF-8字符串并广播
            message = data.decode('utf-8', errors='ignore')
            print(f"Received from TCP {peername}: {message}")
            await globals.websocket_manager.broadcast(message)
    except asyncio.CancelledError:
        print(f"TCP client handler for {peername} cancelled.")
    except Exception as e:
        print(f"Error handling TCP client {peername}: {e}")
    finally:
        writer.close()
        await writer.wait_closed()
        print(f"TCP client writer for {peername} closed.")

async def run_tcp_server_task(port: int):
    """
    启动一个TCP服务器,并在事件循环中运行。
    此函数设计为可取消的后台任务。
    """
    server = None
    try:
        print(f"Starting TCP server on 0.0.0.0:{port}...")
        server = await asyncio.start_server(handle_client, '0.0.0.0', port)
        async with server:
            await server.serve_forever() # 阻塞直到任务被取消
    except asyncio.CancelledError:
        print(f"TCP server on port {port} task cancelled.")
    except Exception as e:
        print(f"Error in TCP server on port {port}: {e}")
    finally:
        if server:
            server.close() # 关闭服务器套接字
            await server.wait_closed() # 等待服务器完全关闭
            print(f"TCP server on port {port} closed.")

main.py (FastAPI应用入口)

# main.py
from fastapi import FastAPI, WebSocket
import asyncio
from contextlib import asynccontextmanager
import globals # 导入全局变量
from server import run_tcp_server_task # 导入TCP服务器启动函数

@asynccontextmanager
async def startup_event(app: FastAPI):
    """
    FastAPI应用的生命周期事件管理器。
    在应用启动时启动TCP服务器,在应用关闭时停止它们。
    """
    print("Application startup: Initializing and starting background tasks...")

    # 定义需要启动的TCP服务器端口
    ports = [8001, 8002, 8003]
    # 为每个TCP服务器创建一个后台任务
    # 这些任务会在当前事件循环中并发运行
    tcp_server_tasks = [asyncio.create_task(run_tcp_server_task(port)) for port in ports]

    # `yield` 标志着应用启动完成,可以开始处理请求
    yield

    # `yield` 之后的部分在应用关闭时执行
    print("Application shutdown: Stopping background tasks...")
    # 取消所有TCP服务器任务
    for task in tcp_server_tasks:
        task.cancel()
    # 等待所有任务完成取消和清理工作
    # `return_exceptions=True` 确保即使有任务取消失败也不会阻塞其他任务
    await asyncio.gather(*tcp_server_tasks, return_exceptions=True)
    print("All background tasks stopped gracefully.")

# 使用lifespan事件管理器创建FastAPI应用
app = FastAPI(lifespan=startup_event)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """
    FastAPI的WebSocket端点。
    管理WebSocket连接,并在连接断开时进行清理。
    """
    print("WebSocket connection established.")
    await globals.websocket_manager.connect(websocket)
    try:
        # 保持WebSocket连接活跃,或处理来自客户端的WebSocket消息
        # 在此示例中,我们只接收消息以保持连接开放,实际应用中可能需要处理消息
        while True:
            await websocket.receive_text()
    except Exception as e:
        print(f"WebSocket Error: {e}")
    finally:
        globals.websocket_manager.disconnect(websocket)
        print("WebSocket connection closed.")

# 运行应用:uvicorn main:app --reload

5. 代码工作原理与注意事项

  1. lifespan的正确使用

    智川X-Agent
    智川X-Agent

    中科闻歌推出的一站式AI智能体开发平台

    下载
    • 在main.py的startup_event中,asyncio.create_task(run_tcp_server_task(port))在yield之前被调用。这确保了TCP服务器任务在FastAPI应用启动时立即开始运行,并作为后台任务与FastAPI的HTTP/WebSocket服务共享同一个事件循环。
    • 当应用收到关闭信号时,yield之后的代码块被执行。此时,我们通过task.cancel()向每个TCP服务器任务发送取消信号,并使用asyncio.gather(*tcp_server_tasks, return_exceptions=True)等待所有任务优雅地完成其清理工作。
  2. TCP服务器的优雅关闭

    • run_tcp_server_task函数内部使用了await server.serve_forever(),这是一个阻塞调用。当其所在的任务被取消时,它会抛出asyncio.CancelledError。
    • try...except asyncio.CancelledError...finally块确保了即使任务被取消,server.close()和await server.wait_closed()也能被执行,从而正确关闭TCP服务器的套接字。
  3. WebSocketManager

    • 负责管理所有活跃的WebSocket连接。当TCP服务器接收到数据时,它会通过globals.websocket_manager.broadcast()将数据发送给所有连接的WebSocket客户端。
    • 在广播过程中,加入了错误处理,如果向某个WebSocket发送数据失败,会将其从活跃连接中移除,提高健壮性。
  4. 全局变量 (globals.py)

    • 用于在不同模块间共享WebSocketManager实例。
    • 注意,如果需要共享其他可变状态并在多个异步任务中访问,应优先使用asyncio.Lock而非threading.Lock,以避免阻塞事件循环。在本例中,data_storage和data_lock未被实际使用。
  5. 运行应用

    • 使用uvicorn main:app --reload命令即可启动FastAPI应用。Uvicorn会自动管理事件循环,并执行lifespan事件。

6. 总结

通过本文的指导,我们学习了如何利用FastAPI的lifespan事件管理器,在同一个事件循环中有效地运行FastAPI应用和多个异步TCP服务器。关键在于理解yield的语义,并使用asyncio.create_task来调度后台任务,同时实现任务的优雅启动和关闭。这种模式对于构建需要集成多种网络服务类型的复杂异步应用至关重要,它确保了资源的有效利用和应用的健壮性。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API

Python FastAPI 异步开发利用 async/await 关键字,通过定义异步视图函数、使用异步数据库库 (如 databases)、异步 HTTP 客户端 (如 httpx),并结合后台任务队列(如 Celery)和异步依赖项,实现高效的 I/O 密集型 API,显著提升吞吐量和响应速度,尤其适用于处理数据库查询、网络请求等耗时操作,无需阻塞主线程。

28

2025.12.22

Python 微服务架构与 FastAPI 框架
Python 微服务架构与 FastAPI 框架

本专题系统讲解 Python 微服务架构设计与 FastAPI 框架应用,涵盖 FastAPI 的快速开发、路由与依赖注入、数据模型验证、API 文档自动生成、OAuth2 与 JWT 身份验证、异步支持、部署与扩展等。通过实际案例,帮助学习者掌握 使用 FastAPI 构建高效、可扩展的微服务应用,提高服务响应速度与系统可维护性。

251

2026.02.06

全局变量怎么定义
全局变量怎么定义

本专题整合了全局变量相关内容,阅读专题下面的文章了解更多详细内容。

93

2025.09.18

python 全局变量
python 全局变量

本专题整合了python中全局变量定义相关教程,阅读专题下面的文章了解更多详细内容。

106

2025.09.18

数据库三范式
数据库三范式

数据库三范式是一种设计规范,用于规范化关系型数据库中的数据结构,它通过消除冗余数据、提高数据库性能和数据一致性,提供了一种有效的数据库设计方法。本专题提供数据库三范式相关的文章、下载和课程。

385

2023.06.29

如何删除数据库
如何删除数据库

删除数据库是指在MySQL中完全移除一个数据库及其所包含的所有数据和结构,作用包括:1、释放存储空间;2、确保数据的安全性;3、提高数据库的整体性能,加速查询和操作的执行速度。尽管删除数据库具有一些好处,但在执行任何删除操作之前,务必谨慎操作,并备份重要的数据。删除数据库将永久性地删除所有相关数据和结构,无法回滚。

2111

2023.08.14

vb怎么连接数据库
vb怎么连接数据库

在VB中,连接数据库通常使用ADO(ActiveX 数据对象)或 DAO(Data Access Objects)这两个技术来实现:1、引入ADO库;2、创建ADO连接对象;3、配置连接字符串;4、打开连接;5、执行SQL语句;6、处理查询结果;7、关闭连接即可。

357

2023.08.31

MySQL恢复数据库
MySQL恢复数据库

MySQL恢复数据库的方法有使用物理备份恢复、使用逻辑备份恢复、使用二进制日志恢复和使用数据库复制进行恢复等。本专题为大家提供MySQL数据库相关的文章、下载、课程内容,供大家免费下载体验。

259

2023.09.05

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

76

2026.03.11

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.5万人学习

Django 教程
Django 教程

共28课时 | 4.9万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号