
在嵌入式系统或PC与外部硬件设备通信的场景中,串行通信(如UART、RS232/485)扮演着核心角色。当多个应用线程需要同时向同一串行设备发送查询或接收数据时,如何构建一个高层次的抽象层,以屏蔽底层的并发控制复杂性,成为了一个关键问题。本文将深入探讨这一挑战,并提供两种主流的解决方案。
理解串行通信的挑战:请求-响应协议与并发
大多数串行设备,特别是简单的从属设备,都遵循严格的请求-响应协议。这意味着设备在接收到一个查询(如“foo”或“bar”)后,会一直忙碌直到发送回相应的响应。在此期间,设备通常无法处理任何新的请求。因此,主机或主设备必须尊重并强制执行这一协议:一次请求消息的传输必须紧随其后是响应的接收(或适当的超时),在当前请求-响应周期完成之前,绝不能开始另一次请求消息的传输。
直接让多个线程并行地向串行端口写入和读取,如以下简化示例所示,将导致数据混乱或协议违规:
def get(query):
serial_port.write(query)
data = serial_port.read(8)
return data这种方式的问题不在于位或字节层面的数据混合(因为操作系统内核驱动程序会处理底层的I/O,线程无法直接访问硬件并同时发送位),而在于它没有强制执行请求-响应协议。如果一个线程在设备尚未响应前发送了另一个查询,或者在读取响应完成前另一个线程开始写入,都将导致通信失败或数据错乱。核心问题是缺乏同步机制来确保对串行端口的独占访问和协议的正确执行。
解决方案一:基于消息队列的专用串行通信线程
一种优雅且高度抽象的解决方案是引入一个专用的线程来管理串行端口的所有I/O操作。这个线程充当串行通信的“守门员”,负责处理所有发送到设备的请求和接收设备的响应。其他需要与串行设备交互的线程,不再直接操作串行端口,而是将它们的请求封装成消息,发送到一个共享的消息队列中。
工作原理:
- 请求队列: 主通信线程维护一个请求队列。当其他线程需要发送数据时,它们将请求(通常包括要发送的查询内容和接收响应的回调函数或用于通知的同步对象)放入此队列。
- 顺序处理: 主通信线程从队列中按顺序取出请求。
- 发送与接收: 对于每个请求,主通信线程执行完整的请求-响应周期:发送查询,然后阻塞式等待设备的响应。
- 响应回传: 收到响应后,主通信线程将结果(或超时错误)通过预设的回调机制或另一个队列发送回最初发起请求的线程。
优点:
- 天然的序列化: 串行端口的访问被强制单线程化,完全避免了并发冲突。
- 高抽象度: 其他线程无需关心底层的同步细节,只需与队列交互。
- 可扩展性: 易于添加更复杂的协议逻辑(如重试机制、错误处理、数据解析等)。
这种方案将串行端口的竞争完全隐藏在队列机制之后,提供了一种简洁且健壮的抽象。
解决方案二:基于互斥锁(Mutex)的独占访问
另一种直接但同样有效的方法是使用互斥锁(Mutex)来保护对串行端口的访问。这种方法不引入额外的通信线程,而是让所有需要访问串行端口的线程在执行I/O操作前,先获取一个共享的互斥锁。
工作原理:
- 共享资源: 串行端口的文件描述符(serial_fd)和互斥锁被视为全局或共享资源。
- 获取锁: 任何线程在开始向串行端口写入或读取之前,必须先获取互斥锁。如果锁已被其他线程持有,当前线程将被阻塞,直到锁被释放。
- 执行完整周期: 获取锁后,线程执行完整的请求-响应周期(发送请求,然后等待并读取响应)。
- 释放锁: 完整的请求-响应周期完成后,线程必须立即释放互斥锁,以便其他等待的线程可以继续。
以下是使用伪代码实现的示例:
// 假设 serial_fd 是串行端口的文件描述符,
// serial_mutex 是保护它的互斥锁
// acquire_the_mutex() 和 release_the_mutex() 是互斥锁操作函数
procedure serial_messaging(u8 *request_mesg, int rqlen, u8 *response_mesg, int rslen)
{
int rc;
// 1. 获取互斥锁,确保独占访问
acquire_the_mutex(); /* 否则调用线程将被阻塞 */
// 2. 发送请求消息
rc = write(serial_fd, request_mesg, rqlen);
if (rc < 0) {
// 处理错误条件
release_the_mutex(); // 错误时也需释放锁
return;
}
// tcdrain(serial_fd); // 对于测量超时可能需要,确保所有数据已从输出缓冲区发送
// 3. 读取响应消息(使用阻塞模式等待)
rc = read(serial_fd, response_mesg, rslen);
if (rc < 0) {
// 处理错误条件
release_the_mutex(); // 错误时也需释放锁
return;
}
// 4. 释放互斥锁,允许其他线程使用串行端口
release_the_mutex(); /* 让另一个线程使用串行端口 */
return; /* 返回接收到的数据 */
}有了 serial_messaging 函数,各个应用线程就可以安全地调用它来发送“foo”或“bar”查询,而无需担心底层的并发问题。例如,在Python中,可以利用 threading.Lock 实现:
import threading
import time
import random
# 模拟串口对象和互斥锁
class MockSerialPort:
def __init__(self):
self.last_query = b""
def write(self, data):
self.last_query = data # 模拟设备知道最后查询了什么
print(f"[{threading.current_thread().name}] Sending: {data.decode()}")
time.sleep(0.05) # 模拟发送时间
def read(self, length):
response = f"response_to_{self.last_query.decode()}".encode()
print(f"[{threading.current_thread().name}] Receiving: {response.decode()}")
time.sleep(0.05) # 模拟接收时间
return response[:length] # 模拟响应长度限制
serial_port = MockSerialPort()
serial_lock = threading.Lock()
def get_with_lock(query):
with serial_lock: # 使用 with 语句自动管理锁的获取和释放
serial_port.write(query)
data = serial_port.read(8) # 假设响应长度为8
return data
def thread1_task():
while True:
response = get_with_lock(b"foo")
print(f"[{threading.current_thread().name}] received: {response.decode()}")
time.sleep(1)
def thread2_task():
time.sleep(random.random()) # 随机等待一段时间
response = get_with_lock(b"bar")
print(f"[{threading.current_thread().name}] received: {response.decode()}")
# 示例:启动两个线程
# threading.Thread(target=thread1_task, name="Thread-Foo").start()
# threading.Thread(target=thread2_task, name="Thread-Bar").start()常见误区与重要考量
在实现串行通信的并发抽象时,有几个常见的误区和重要考量需要注意:
- 位/字节级数据混合的误解: 许多人担心多个线程同时写入会导致数据在位或字节级别上混淆。实际上,操作系统内核的串行端口驱动程序会处理I/O请求的排队和序列化,因此通常不会发生位或字节级别的混淆。真正的问题在于,如果没有同步机制,高层应用程序无法确保在发送新请求之前,前一个请求的完整响应已经接收完毕,从而导致协议违规和逻辑错误。
- 强制执行请求-响应协议: 无论采用哪种方法,核心都是要确保在发送下一个请求之前,前一个请求的完整响应(或超时)已经完成。这意味着 write 操作必须紧跟着相应的 read 操作,并且整个 write-read 周期必须是原子的,不受其他线程干扰。
- 超时处理: 在 read 操作中引入超时机制至关重要。如果设备没有在预期时间内响应,read 操作应该超时并返回错误,而不是无限期阻塞。这有助于提高系统的鲁棒性,防止线程因设备无响应而永久挂起。
-
选择合适的方案:
- 专用线程+队列 方案更适用于复杂协议、需要后台持续轮询或事件驱动的场景。它提供了更强的隔离和更高的抽象层次,使得业务逻辑线程更加简洁。但它引入了额外的线程间通信开销。
- 互斥锁 方案更直接,适用于请求频率不高、协议相对简单的










