使用Lock保护状态变更,结合Condition和队列实现线程安全的状态机,确保多线程下状态切换的安全与逻辑清晰。

在多线程环境中实现状态机,核心是保证状态切换的安全性和逻辑清晰。Python 的 threading 模块提供了基础支持,但要控制复杂逻辑,需结合同步机制与良好的设计模式。
使用线程安全的状态机类
状态机本质是维护一个当前状态,并根据输入或事件进行转移。在多线程中,多个线程可能同时尝试触发状态转移,必须防止竞态条件。
通过 threading.Lock 保护状态变更操作,确保任意时刻只有一个线程能修改状态。
立即学习“Python免费学习笔记(深入)”;
import threading
<p>class ThreadSafeStateMachine:
def <strong>init</strong>(self):
self._state = "idle"
self._lock = threading.Lock()</p><pre class='brush:python;toolbar:false;'>def transition(self, event):
with self._lock:
if self._state == "idle" and event == "start":
self._state = "running"
print("State → running")
elif self._state == "running" and event == "stop":
self._state = "idle"
print("State → idle")
else:
print(f"Ignore event {event} in state {self._state}")
def get_state(self):
with self._lock:
return self._state用 Condition 实现状态等待与通知
某些场景下,线程需要等待状态达到某个条件才继续执行。例如:工作线程等待“运行”状态,主线程控制启停。
threading.Condition 可以让线程等待特定状态,状态变化后主动唤醒。
import time
<p>class ControlledWorker:
def <strong>init</strong>(self):
self.state = "paused"
self.condition = threading.Condition()</p><pre class='brush:python;toolbar:false;'>def worker_loop(self):
while True:
with self.condition:
while self.state == "paused":
self.condition.wait() # 等待被唤醒
if self.state == "stopped":
break
print("Working...")
time.sleep(1)
print("Worker stopped.")
def start(self):
with self.condition:
self.state = "running"
self.condition.notify_all()
def pause(self):
with self.condition:
self.state = "paused"
def stop(self):
with self.condition:
self.state = "stopped"
self.condition.notify_all()结合队列实现事件驱动状态转移
对于更复杂的逻辑,可引入 queue.Queue 作为事件通道。状态机从队列中消费事件,避免多线程直接调用带来的混乱。
这种方式解耦了事件产生与处理,适合高并发或异步任务场景。
import queue
import threading
<p>def event_driven_fsm():
fsm_queue = queue.Queue()
state = "idle"</p><pre class='brush:python;toolbar:false;'>def fsm_worker():
nonlocal state
while True:
event = fsm_queue.get()
if event == "quit":
break
with threading.Lock(): # 状态本身仍需保护
if state == "idle" and event == "init":
state = "ready"
elif state == "ready" and event == "run":
state = "running"
elif event == "reset":
state = "idle"
print(f"[FSM] State: {state}, Event: {event}")
fsm_queue.task_done()
threading.Thread(target=fsm_worker, daemon=True).start()
return fsm_queue使用示例
q = event_driven_fsm() q.put("init") q.put("run") q.put("reset") q.put("quit") q.join() # 等待处理完成
技巧总结
- 始终用 Lock 或 with 语句保护共享状态变量
- 避免在状态转移中执行耗时操作,防止阻塞其他线程
- 优先使用事件队列而非直接跨线程调用方法
- 考虑使用 Enum 定义状态值,提升可读性与安全性
- 调试时加入日志记录状态变化和线程 ID,便于追踪问题
基本上就这些。关键是把状态变更变成串行化操作,再通过合适的同步原语协调线程行为。结构清晰了,复杂逻辑也能可控。







