0

0

Python Socket服务器多客户端并发与完成信号处理教程

花韻仙語

花韻仙語

发布时间:2025-11-30 11:37:00

|

846人浏览过

|

来源于php中文网

原创

Python Socket服务器多客户端并发与完成信号处理教程

本教程详细阐述了如何使用python的`select`模块构建一个能够同时处理多种类型客户端连接的socket服务器。服务器将有效地监听多个客户端的传入消息,并在所有预期的客户端发送“complete”信号后,执行特定操作并优雅关闭。文章通过示例代码和最佳实践,指导开发者实现高效、非阻塞的多客户端通信管理。

引言:多客户端并发通信挑战

在网络编程中,构建一个能够同时服务多个客户端的服务器是常见的需求。特别是在某些场景下,服务器不仅需要接收来自不同类型客户端的数据,还需要等待所有客户端完成其特定的消息发送流程(例如,发送一个“complete”信号)后,才能执行下一步操作。

传统的阻塞式Socket服务器通常采用为每个新连接创建一个新线程或进程的方式来处理并发。然而,这种方法在处理客户端断开、重连或需要精确管理特定完成信号的场景时,可能会遇到挑战。例如,如果客户端在发送“complete”前断开,或多个客户端尝试复用同一个处理线程,都可能导致逻辑混乱或资源浪费。

本文将探讨如何使用Python标准库中的select模块,以一种非阻塞、高效的方式解决上述问题,实现一个能够监听多种客户端类型并等待所有客户端完成特定任务的Socket服务器。

解决方案:使用 select 模块进行I/O多路复用

select模块提供了一种I/O多路复用机制,允许单个线程同时监听多个Socket连接,并在任何一个Socket准备好读写或发生异常时得到通知。这种机制非常适合处理大量并发连接,因为它避免了为每个连接创建独立线程的开销,并能更精细地控制连接状态。

立即学习Python免费学习笔记(深入)”;

select 模块的工作原理

select.select(rlist, wlist, xlist, timeout) 函数是其核心。它接收三个列表:

  • rlist:包含需要监听读事件的Socket对象(例如,有新数据到达或有新连接请求)。
  • wlist:包含需要监听写事件的Socket对象(例如,可以发送数据)。
  • xlist:包含需要监听异常事件的Socket对象。
  • timeout:可选参数,指定等待事件的超时时间(秒)。如果为None,则阻塞直到有事件发生;如果为0,则立即返回。

函数返回三个列表:readable, writable, exceptional,分别对应发生读、写、异常事件的Socket对象。

服务器端实现步骤

我们将构建一个服务器,它能够:

  1. 监听新的客户端连接。
  2. 接收来自已连接客户端的数据。
  3. 识别客户端发送的“complete”信号。
  4. 当所有预期的客户端都发送“complete”信号后,关闭服务器。

以下是使用select模块实现此功能的详细代码和解释:

WowTo
WowTo

用AI建立视频知识库

下载
import socket
import select
import logging
import sys

# 配置日志,便于调试
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def start_multi_client_completion_server(ip, port, expected_client_count):
    """
    启动一个Socket服务器,监听多个客户端连接,并等待所有客户端发送“complete”信号。

    Args:
        ip (str): 服务器绑定的IP地址。
        port (int): 服务器监听的端口。
        expected_client_count (int): 预期发送“complete”信号的客户端数量。
    """
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setblocking(False) # 设置为非阻塞模式

    try:
        server_socket.bind((ip, port))
        server_socket.listen(5) # 允许5个待处理的连接请求
        logging.info(f"服务器已启动,监听在 {ip}:{port},等待 {expected_client_count} 个客户端完成。")
    except socket.error as e:
        logging.error(f"服务器启动失败: {e}")
        sys.exit(1)

    # inputs 列表用于存放所有需要select监听读事件的socket对象
    # 初始时只包含服务器自身的socket,用于监听新连接
    inputs = [server_socket]

    # complete_clients_count 用于记录已发送“complete”信号的客户端数量
    complete_clients_count = 0

    try:
        while True:
            # 使用 select.select() 监听可读事件
            # timeout=10 表示最长阻塞10秒,如果没有事件发生,则循环继续
            readable, _, exceptional = select.select(inputs, [], inputs, 10)

            # 如果在超时时间内没有任何事件发生
            if not (readable or exceptional):
                logging.info("等待客户端连接或数据中 (10秒超时)...")
                continue

            for s in readable:
                if s is server_socket:
                    # 如果是服务器socket可读,说明有新的客户端连接请求
                    conn, addr = s.accept()
                    conn.setblocking(False) # 新连接也设置为非阻塞
                    inputs.append(conn) # 将新连接加入到监听列表中
                    logging.info(f"接受新连接: {addr}")
                else:
                    # 如果是客户端socket可读,说明有数据到达
                    try:
                        data = s.recv(1024).decode('utf8').strip()
                        if data:
                            logging.info(f"收到来自 {s.getpeername()} 的数据: '{data}'")
                            if data == 'complete':
                                complete_clients_count += 1
                                logging.info(f"客户端 {s.getpeername()} 发送了 'complete'。当前完成数: {complete_clients_count}/{expected_client_count}")
                                # 客户端发送'complete'后,可以将其从监听列表中移除,因为它已完成任务
                                inputs.remove(s)
                                s.close() # 关闭该客户端连接
                        else:
                            # 客户端断开连接 (recv返回空字节)
                            logging.info(f"客户端 {s.getpeername()} 断开连接。")
                            inputs.remove(s)
                            s.close()
                    except ConnectionResetError:
                        # 客户端突然断开连接
                        logging.warning(f"客户端 {s.getpeername()} 异常断开。")
                        inputs.remove(s)
                        s.close()
                    except Exception as e:
                        logging.error(f"处理客户端 {s.getpeername()} 数据时发生错误: {e}")
                        inputs.remove(s)
                        s.close()

            for s in exceptional:
                # 处理异常情况,例如客户端异常断开
                logging.error(f"客户端 {s.getpeername()} 发生异常。")
                inputs.remove(s)
                s.close()

            # 检查是否所有预期的客户端都已发送“complete”信号
            if complete_clients_count >= expected_client_count:
                logging.info(f"所有 {expected_client_count} 个客户端已发送 'complete' 信号。服务器完成任务。")
                break # 退出主循环

    except KeyboardInterrupt:
        logging.info("服务器被用户中断。")
    except Exception as e:
        logging.error(f"服务器运行过程中发生未预期错误: {e}")
    finally:
        # 清理所有打开的socket
        for s in inputs:
            s.close()
        logging.info("服务器已关闭。")

# 示例使用
if __name__ == "__main__":
    SERVER_IP = '127.0.0.1' # 本地回环地址
    SERVER_PORT = 12345
    EXPECTED_CLIENTS = 2 # 假设我们预期2个客户端发送'complete'

    # 启动服务器在一个单独的线程中,以便主线程可以运行客户端示例
    import threading
    server_thread = threading.Thread(target=start_multi_client_completion_server, 
                                     args=(SERVER_IP, SERVER_PORT, EXPECTED_CLIENTS))
    server_thread.start()

    # 简单客户端模拟 (类型1: 连续发送,最后发送complete)
    def client_type1(client_id, messages_to_send):
        client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            client_socket.connect((SERVER_IP, SERVER_PORT))
            logging.info(f"客户端 {client_id} (类型1) 已连接。")
            for i in range(messages_to_send):
                msg = f"Client{client_id}_Msg_{i}"
                client_socket.sendall(msg.encode('utf8'))
                logging.info(f"客户端 {client_id} 发送: {msg}")
                import time
                time.sleep(0.1)
            client_socket.sendall(b'complete')
            logging.info(f"客户端 {client_id} 发送 'complete'。")
        except socket.error as e:
            logging.error(f"客户端 {client_id} (类型1) 连接或发送失败: {e}")
        finally:
            client_socket.close()
            logging.info(f"客户端 {client_id} (类型1) 已关闭。")

    # 简单客户端模拟 (类型2: 每次发送后断开,最后一次连接发送complete)
    def client_type2(client_id, messages_to_send):
        for i in range(messages_to_send):
            client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            try:
                client_socket.connect((SERVER_IP, SERVER_PORT))
                msg = f"Client{client_id}_Msg_{i}"
                client_socket.sendall(msg.encode('utf8'))
                logging.info(f"客户端 {client_id} 发送: {msg} (并断开)")
            except socket.error as e:
                logging.error(f"客户端 {client_id} (类型2) 连接或发送失败: {e}")
            finally:
                client_socket.close()
            import time
            time.sleep(0.2)

        # 最后一次连接发送 'complete'
        final_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            final_client_socket.connect((SERVER_IP, SERVER_PORT))
            final_client_socket.sendall(b'complete')
            logging.info(f"客户端 {client_id} 发送 'complete' (最后一次连接)。")
        except socket.error as e:
            logging.error(f"客户端 {client_id} (类型2) 最终发送失败: {e}")
        finally:
            final_client_socket.close()
            logging.info(f"客户端 {client_id} (类型2) 已关闭。")


    # 启动客户端
    # 注意: 如果EXPECTED_CLIENTS为2,这里需要启动两个客户端
    # 确保客户端数量与 EXPECTED_CLIENTS 匹配,否则服务器会一直等待
    client1_thread = threading.Thread(target=client_type1, args=(1, 3))
    client2_thread = threading.Thread(target=client_type2, args=(2, 2))

    client1_thread.start()
    client2_thread.start()

    client1_thread.join()
    client2_thread.join()
    server_thread.join() # 等待服务器线程结束
    logging.info("所有客户端和服务器线程均已结束。")

代码解析

  1. 初始化服务器Socket

    • server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM):创建TCP Socket。
    • server_socket.setblocking(False):关键一步,将服务器Socket设置为非阻塞模式。这是select机制能够工作的基础。
    • server_socket.bind((ip, port)) 和 server_socket.listen(5):绑定地址并开始监听连接。
  2. inputs 列表

    • 这个列表是select.select()函数第一个参数的核心。它包含了所有当前服务器需要监听读事件的Socket对象。
    • 初始时,inputs只包含server_socket,因为服务器首先需要监听新的客户端连接请求。
    • 每当有新的客户端连接时,server_socket.accept()返回的客户端连接Socket (conn) 也会被添加到inputs列表中,以便监听该客户端的数据。
  3. 主循环 (while True)

    • readable, _, exceptional = select.select(inputs, [], inputs, 10):这是I/O多路复用的核心。它会阻塞最多10秒,等待inputs列表中任何一个Socket变得可读(有新数据、新连接或连接断开)或发生异常。
    • 处理新连接:如果server_socket在readable列表中,说明有新的客户端尝试连接。server_socket.accept()接受连接,并将新的客户端Socket conn 添加到inputs中。注意:新接受的conn也应设置为非阻塞模式。
    • 处理客户端数据:如果客户端Socket在readable列表中,说明该客户端发送了数据。
      • s.recv(1024)接收数据。
      • 如果data非空,则打印数据。
      • 如果data == 'complete',则complete_clients_count加1,并将该客户端Socket从inputs中移除并关闭,因为它已完成任务。
      • 如果data为空,表示客户端已正常断开连接,同样将其从inputs中移除并关闭。
      • ConnectionResetError 捕获客户端异常断开的情况。
    • 处理异常:exceptional列表中的Socket表示发生了错误,应进行清理和关闭。
  4. 完成计数与退出

    • if complete_clients_count >= expected_client_count::在每次循环结束时,检查已完成任务的客户端数量是否达到预期。如果达到,则服务器完成其使命,跳出循环并关闭。
  5. 资源清理

    • finally块确保在服务器关闭或发生异常时,所有打开的Socket都被正确关闭,防止资源泄露。

客户端模拟

示例中提供了两种客户端类型:

  • 类型1:连接一次,连续发送多条消息,最后发送“complete”并关闭。
  • 类型2:每次发送一条消息后断开连接,重复多次,最后一次连接发送“complete”并关闭。

这两种客户端类型都很好地展示了select服务器如何灵活地处理不同的连接模式和消息流。

注意事项与最佳实践

  1. 非阻塞模式:所有要被select监听的Socket都必须设置为非阻塞模式 (socket.setblocking(False)),否则recv或accept等操作可能会阻塞整个程序。
  2. 错误处理:网络通信中错误无处不在。务必使用try-except块捕获socket.error、ConnectionResetError等异常,并进行适当的日志记录和清理。
  3. 数据协议:本例中以字符串“complete”作为完成信号。在实际应用中,可能需要更复杂的数据协议,例如包含消息长度前缀、JSON或Protobuf等,以确保消息的完整性和可靠性。
  4. select的局限性:select在处理数千个并发连接时表现良好,但当连接数量达到数万甚至更高时,其性能可能会下降,因为它需要遍历所有监听的Socket。在Linux系统上,epoll通常是更高性能的选择;在BSD/macOS上,是kqueue。Python的selectors模块提供了一个统一的接口来使用这些底层机制。
  5. 超时设置:select.select()的timeout参数非常重要。如果设置为None,服务器将无限期阻塞直到有事件发生;如果设置为0,则会立即返回,这可能导致CPU空转。设置一个合理的超时时间(如本例中的10秒)可以使服务器周期性地执行其他任务或检查状态,而不会长时间阻塞。
  6. expected_client_count:这个变量决定了服务器何时关闭。在实际应用中,这个数量可能不是固定的,可能需要通过其他机制(例如,一个特定的管理客户端发送一个“任务开始”信号,并告知预期客户端数量)来动态确定。
  7. 日志记录:使用logging模块可以帮助开发者更好地理解服务器的运行状态,并在出现问题时进行调试。

总结

通过本文,我们学习了如何使用Python的select模块构建一个高效、非阻塞的Socket服务器,以应对多客户端并发连接并等待所有客户端完成特定任务的场景。select机制通过I/O多路复用,避免了传统多线程/多进程模型在资源消耗和管理上的复杂性,为中等规模的并发网络应用提供了一个健壮的解决方案。理解并掌握select模块,是Python网络编程中一项重要的技能。

相关专题

更多
python开发工具
python开发工具

php中文网为大家提供各种python开发工具,好的开发工具,可帮助开发者攻克编程学习中的基础障碍,理解每一行源代码在程序执行时在计算机中的过程。php中文网还为大家带来python相关课程以及相关文章等内容,供大家免费下载使用。

769

2023.06.15

python打包成可执行文件
python打包成可执行文件

本专题为大家带来python打包成可执行文件相关的文章,大家可以免费的下载体验。

661

2023.07.20

python能做什么
python能做什么

python能做的有:可用于开发基于控制台的应用程序、多媒体部分开发、用于开发基于Web的应用程序、使用python处理数据、系统编程等等。本专题为大家提供python相关的各种文章、以及下载和课程。

764

2023.07.25

format在python中的用法
format在python中的用法

Python中的format是一种字符串格式化方法,用于将变量或值插入到字符串中的占位符位置。通过format方法,我们可以动态地构建字符串,使其包含不同值。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

639

2023.07.31

python教程
python教程

Python已成为一门网红语言,即使是在非编程开发者当中,也掀起了一股学习的热潮。本专题为大家带来python教程的相关文章,大家可以免费体验学习。

1305

2023.08.03

python环境变量的配置
python环境变量的配置

Python是一种流行的编程语言,被广泛用于软件开发、数据分析和科学计算等领域。在安装Python之后,我们需要配置环境变量,以便在任何位置都能够访问Python的可执行文件。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

549

2023.08.04

python eval
python eval

eval函数是Python中一个非常强大的函数,它可以将字符串作为Python代码进行执行,实现动态编程的效果。然而,由于其潜在的安全风险和性能问题,需要谨慎使用。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

579

2023.08.04

scratch和python区别
scratch和python区别

scratch和python的区别:1、scratch是一种专为初学者设计的图形化编程语言,python是一种文本编程语言;2、scratch使用的是基于积木的编程语法,python采用更加传统的文本编程语法等等。本专题为大家提供scratch和python相关的文章、下载、课程内容,供大家免费下载体验。

709

2023.08.11

Java JVM 原理与性能调优实战
Java JVM 原理与性能调优实战

本专题系统讲解 Java 虚拟机(JVM)的核心工作原理与性能调优方法,包括 JVM 内存结构、对象创建与回收流程、垃圾回收器(Serial、CMS、G1、ZGC)对比分析、常见内存泄漏与性能瓶颈排查,以及 JVM 参数调优与监控工具(jstat、jmap、jvisualvm)的实战使用。通过真实案例,帮助学习者掌握 Java 应用在生产环境中的性能分析与优化能力。

19

2026.01.20

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
PostgreSQL 教程
PostgreSQL 教程

共48课时 | 7.5万人学习

Git 教程
Git 教程

共21课时 | 2.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号