
本教程详细阐述了如何使用python的`select`模块构建一个能够同时处理多种类型客户端连接的socket服务器。服务器将有效地监听多个客户端的传入消息,并在所有预期的客户端发送“complete”信号后,执行特定操作并优雅关闭。文章通过示例代码和最佳实践,指导开发者实现高效、非阻塞的多客户端通信管理。
在网络编程中,构建一个能够同时服务多个客户端的服务器是常见的需求。特别是在某些场景下,服务器不仅需要接收来自不同类型客户端的数据,还需要等待所有客户端完成其特定的消息发送流程(例如,发送一个“complete”信号)后,才能执行下一步操作。
传统的阻塞式Socket服务器通常采用为每个新连接创建一个新线程或进程的方式来处理并发。然而,这种方法在处理客户端断开、重连或需要精确管理特定完成信号的场景时,可能会遇到挑战。例如,如果客户端在发送“complete”前断开,或多个客户端尝试复用同一个处理线程,都可能导致逻辑混乱或资源浪费。
本文将探讨如何使用Python标准库中的select模块,以一种非阻塞、高效的方式解决上述问题,实现一个能够监听多种客户端类型并等待所有客户端完成特定任务的Socket服务器。
select模块提供了一种I/O多路复用机制,允许单个线程同时监听多个Socket连接,并在任何一个Socket准备好读写或发生异常时得到通知。这种机制非常适合处理大量并发连接,因为它避免了为每个连接创建独立线程的开销,并能更精细地控制连接状态。
立即学习“Python免费学习笔记(深入)”;
select.select(rlist, wlist, xlist, timeout) 函数是其核心。它接收三个列表:
函数返回三个列表:readable, writable, exceptional,分别对应发生读、写、异常事件的Socket对象。
我们将构建一个服务器,它能够:
以下是使用select模块实现此功能的详细代码和解释:
import socket
import select
import logging
import sys
# 配置日志,便于调试
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def start_multi_client_completion_server(ip, port, expected_client_count):
"""
启动一个Socket服务器,监听多个客户端连接,并等待所有客户端发送“complete”信号。
Args:
ip (str): 服务器绑定的IP地址。
port (int): 服务器监听的端口。
expected_client_count (int): 预期发送“complete”信号的客户端数量。
"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setblocking(False) # 设置为非阻塞模式
try:
server_socket.bind((ip, port))
server_socket.listen(5) # 允许5个待处理的连接请求
logging.info(f"服务器已启动,监听在 {ip}:{port},等待 {expected_client_count} 个客户端完成。")
except socket.error as e:
logging.error(f"服务器启动失败: {e}")
sys.exit(1)
# inputs 列表用于存放所有需要select监听读事件的socket对象
# 初始时只包含服务器自身的socket,用于监听新连接
inputs = [server_socket]
# complete_clients_count 用于记录已发送“complete”信号的客户端数量
complete_clients_count = 0
try:
while True:
# 使用 select.select() 监听可读事件
# timeout=10 表示最长阻塞10秒,如果没有事件发生,则循环继续
readable, _, exceptional = select.select(inputs, [], inputs, 10)
# 如果在超时时间内没有任何事件发生
if not (readable or exceptional):
logging.info("等待客户端连接或数据中 (10秒超时)...")
continue
for s in readable:
if s is server_socket:
# 如果是服务器socket可读,说明有新的客户端连接请求
conn, addr = s.accept()
conn.setblocking(False) # 新连接也设置为非阻塞
inputs.append(conn) # 将新连接加入到监听列表中
logging.info(f"接受新连接: {addr}")
else:
# 如果是客户端socket可读,说明有数据到达
try:
data = s.recv(1024).decode('utf8').strip()
if data:
logging.info(f"收到来自 {s.getpeername()} 的数据: '{data}'")
if data == 'complete':
complete_clients_count += 1
logging.info(f"客户端 {s.getpeername()} 发送了 'complete'。当前完成数: {complete_clients_count}/{expected_client_count}")
# 客户端发送'complete'后,可以将其从监听列表中移除,因为它已完成任务
inputs.remove(s)
s.close() # 关闭该客户端连接
else:
# 客户端断开连接 (recv返回空字节)
logging.info(f"客户端 {s.getpeername()} 断开连接。")
inputs.remove(s)
s.close()
except ConnectionResetError:
# 客户端突然断开连接
logging.warning(f"客户端 {s.getpeername()} 异常断开。")
inputs.remove(s)
s.close()
except Exception as e:
logging.error(f"处理客户端 {s.getpeername()} 数据时发生错误: {e}")
inputs.remove(s)
s.close()
for s in exceptional:
# 处理异常情况,例如客户端异常断开
logging.error(f"客户端 {s.getpeername()} 发生异常。")
inputs.remove(s)
s.close()
# 检查是否所有预期的客户端都已发送“complete”信号
if complete_clients_count >= expected_client_count:
logging.info(f"所有 {expected_client_count} 个客户端已发送 'complete' 信号。服务器完成任务。")
break # 退出主循环
except KeyboardInterrupt:
logging.info("服务器被用户中断。")
except Exception as e:
logging.error(f"服务器运行过程中发生未预期错误: {e}")
finally:
# 清理所有打开的socket
for s in inputs:
s.close()
logging.info("服务器已关闭。")
# 示例使用
if __name__ == "__main__":
SERVER_IP = '127.0.0.1' # 本地回环地址
SERVER_PORT = 12345
EXPECTED_CLIENTS = 2 # 假设我们预期2个客户端发送'complete'
# 启动服务器在一个单独的线程中,以便主线程可以运行客户端示例
import threading
server_thread = threading.Thread(target=start_multi_client_completion_server,
args=(SERVER_IP, SERVER_PORT, EXPECTED_CLIENTS))
server_thread.start()
# 简单客户端模拟 (类型1: 连续发送,最后发送complete)
def client_type1(client_id, messages_to_send):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
client_socket.connect((SERVER_IP, SERVER_PORT))
logging.info(f"客户端 {client_id} (类型1) 已连接。")
for i in range(messages_to_send):
msg = f"Client{client_id}_Msg_{i}"
client_socket.sendall(msg.encode('utf8'))
logging.info(f"客户端 {client_id} 发送: {msg}")
import time
time.sleep(0.1)
client_socket.sendall(b'complete')
logging.info(f"客户端 {client_id} 发送 'complete'。")
except socket.error as e:
logging.error(f"客户端 {client_id} (类型1) 连接或发送失败: {e}")
finally:
client_socket.close()
logging.info(f"客户端 {client_id} (类型1) 已关闭。")
# 简单客户端模拟 (类型2: 每次发送后断开,最后一次连接发送complete)
def client_type2(client_id, messages_to_send):
for i in range(messages_to_send):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
client_socket.connect((SERVER_IP, SERVER_PORT))
msg = f"Client{client_id}_Msg_{i}"
client_socket.sendall(msg.encode('utf8'))
logging.info(f"客户端 {client_id} 发送: {msg} (并断开)")
except socket.error as e:
logging.error(f"客户端 {client_id} (类型2) 连接或发送失败: {e}")
finally:
client_socket.close()
import time
time.sleep(0.2)
# 最后一次连接发送 'complete'
final_client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
final_client_socket.connect((SERVER_IP, SERVER_PORT))
final_client_socket.sendall(b'complete')
logging.info(f"客户端 {client_id} 发送 'complete' (最后一次连接)。")
except socket.error as e:
logging.error(f"客户端 {client_id} (类型2) 最终发送失败: {e}")
finally:
final_client_socket.close()
logging.info(f"客户端 {client_id} (类型2) 已关闭。")
# 启动客户端
# 注意: 如果EXPECTED_CLIENTS为2,这里需要启动两个客户端
# 确保客户端数量与 EXPECTED_CLIENTS 匹配,否则服务器会一直等待
client1_thread = threading.Thread(target=client_type1, args=(1, 3))
client2_thread = threading.Thread(target=client_type2, args=(2, 2))
client1_thread.start()
client2_thread.start()
client1_thread.join()
client2_thread.join()
server_thread.join() # 等待服务器线程结束
logging.info("所有客户端和服务器线程均已结束。")初始化服务器Socket:
inputs 列表:
主循环 (while True):
完成计数与退出:
资源清理:
示例中提供了两种客户端类型:
这两种客户端类型都很好地展示了select服务器如何灵活地处理不同的连接模式和消息流。
通过本文,我们学习了如何使用Python的select模块构建一个高效、非阻塞的Socket服务器,以应对多客户端并发连接并等待所有客户端完成特定任务的场景。select机制通过I/O多路复用,避免了传统多线程/多进程模型在资源消耗和管理上的复杂性,为中等规模的并发网络应用提供了一个健壮的解决方案。理解并掌握select模块,是Python网络编程中一项重要的技能。
以上就是Python Socket服务器多客户端并发与完成信号处理教程的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号