
本文深入探讨了如何使用python的simpy库对工厂生产线进行离散事件仿真。通过一个具体的工厂模型案例,我们详细解析了simpy中资源(如操作员、机器人、工装夹具)的定义与管理,以及如何编排复杂的生产流程。重点阐述了资源请求与释放的正确实践,特别是`with`语句与手动请求/释放的区别与适用场景,旨在帮助读者构建高效、准确的仿真模型,并有效排查潜在的死锁问题。
SimPy是一个强大的Python库,用于进行离散事件仿真(Discrete Event Simulation, DES)。在SimPy中,系统状态只在离散的时间点上发生变化,这些时间点被称为“事件”。构建仿真模型主要涉及以下核心概念:
本教程将通过一个模拟工厂生产线的案例来演示SimPy的应用。模型的目标是模拟零件在多个工位(机器人、操作员、夹具)之间流转的整个生产过程。
首先,定义一些全局常量,如仿真时长、操作员数量以及各种工艺操作所需的时间。
import simpy
import itertools
class g: # 全局常量
t_sim = 600 # 仿真时长,这里设置为600秒,原为一天
operator_qty = 2 # 操作员数量
sWeld_t = 2.75 # 点焊时间 (秒/次)
Adh_t = 1/60 # 涂胶时间 (秒/毫米)
ArcStud_t = 5.5 # 拉弧螺柱焊时间 (秒/次)
ProjStud_t = 10 # 凸焊螺柱时间 (秒/次)cell_model类代表整个工厂单元,负责初始化仿真环境和所有生产资源。
class cell_model:
def __init__(self):
self.env = simpy.Environment() # 创建SimPy仿真环境
self.part_counter = 0 # 零件计数器
# 声明资源:操作员、机器人和工装夹具
# PriorityResource 允许指定请求优先级,这里使用零件ID作为优先级,确保FIFO
self.OP = simpy.PriorityResource(self.env, capacity=g.operator_qty)
self.R1 = simpy.PriorityResource(self.env, capacity=1)
self.R2 = simpy.PriorityResource(self.env, capacity=1)
self.R3 = simpy.PriorityResource(self.env, capacity=1)
self.R4 = simpy.PriorityResource(self.env, capacity=1)
self.Fix01 = simpy.PriorityResource(self.env, capacity=1) # 夹具1
self.Fix02 = simpy.PriorityResource(self.env, capacity=1) # 夹具2
self.Fix101 = simpy.PriorityResource(self.env, capacity=1) # 夹具101
# 启动零件生成器进程
self.env.process(self.part_generator())part_generator是一个SimPy进程,负责周期性地生成新的零件,并为每个零件启动一个make_part进程。
def part_generator(self):
for p_Id in itertools.count(): # 无限生成零件ID
yield self.env.timeout(1) # 每隔1秒生成一个新零件
self.env.process(self.make_part(p_Id)) # 为新零件启动生产流程make_part方法是整个模型的核心,它定义了一个零件从进入生产线到完成生产的完整流程。每个零件(由p_Id标识)都会作为一个独立的SimPy进程,按照预定的步骤请求和释放各种资源。
一个零件的生产流程包括:
值得注意的是,机器人R2在流程的开始和结束阶段都会被调用。
在SimPy中,进程通过request()方法请求资源,并通过release()方法释放资源。当资源被请求时,如果其容量已满,请求进程会进入等待队列,直到资源可用。
在SimPy中,处理资源有两种主要方式:使用with语句或手动调用request()和release()。理解这两种方式的区别至关重要,尤其是在资源持有时间跨越多个不连续操作或需要等待其他资源时。
with resource.request() as req: 语句是Python上下文管理器的一种应用。当进程进入with块时,它会请求资源;当进程离开with块(无论是正常完成还是发生异常),资源都会被自动释放。
这种方式简洁方便,适用于资源在单个连续操作中被请求、使用并释放的场景。例如,如果一个机器人执行一个焊接任务,任务完成后立即释放机器人,那么with语句是理想选择。
然而,当一个资源需要在多个不连续的操作步骤中被持有,或者在持有该资源的同时,进程还需要请求其他资源并等待时,with语句的自动释放特性就会成为问题。如果资源在with块结束时被过早释放,而实际上进程仍需要它,就会导致模型行为不正确。
为了解决with语句的局限性,SimPy允许手动管理资源的请求和释放。
请求资源:
resource_request = self.Resource.request(priority=p_Id) yield resource_request # 等待直到资源可用
这里,resource_request是一个事件对象,yield语句使进程暂停,直到资源被成功获取。
释放资源:
yield self.Resource.release(resource_request)
当进程不再需要资源时,它必须显式地调用release()方法来释放之前请求的资源。这允许进程在持有资源的同时执行其他操作、等待其他资源,并在合适的时机释放。
在原始代码中,许多资源(如OP, Fix01, R1, Fix101, R3, Fix02)使用了with语句,但在with块内部又进行了其他资源的请求或长时间的等待。这导致这些资源在with块结束时被过早释放,与实际的生产流程不符。
例如,操作员(OP)在将零件放到Fix01后就应该被释放,而不是等到Fix01的整个操作完成。同样,Fix01在零件被机器人R1移走后才应该被释放。
修正后的代码将这些关键资源的请求和释放改为手动管理,确保资源只在真正不再需要时才被释放。对于那些使用场景确实是连续且独立的资源(如R2和R4的某些步骤),with语句仍然是合适的。
以下是make_part方法中修正后的关键部分:
def make_part(self,p_Id):#描述单个零件在单元中经历的系列过程
### OP请求 ###
OP = self.OP.request(priority=p_Id) # 手动请求操作员
print(f"part {p_Id} OP request at {self.env.now}")
yield OP # 等待操作员可用
print(f"part {p_Id} OP seized at {self.env.now}")
print("Part ", p_Id, " is next available at ", self.env.now, sep="")
yield self.env.timeout(29)# 操作员搬运零件到工作台
### 请求Fix01 ###
Fix01 = self.Fix01.request(priority=p_Id) # 手动请求夹具Fix01
print(f"part {p_Id} Fix01 request at {self.env.now}")
yield Fix01 # 等待夹具Fix01可用
print(f"part {p_Id} Fix01 seized at {self.env.now}")
yield self.env.timeout(27) # 操作员加载零件,退出,开始
### 释放OP ###
yield self.OP.release(OP) # 操作员完成任务,释放
print(f"part {p_Id} OP released at {self.env.now}")
print("Operator 01 released part ", p_Id, " at ", self.env.now, sep="")
yield self.env.timeout(0)
### 请求和释放R2 (使用with语句,因为R2在此阶段使用是连续的) ###
with self.R2.request(priority=p_Id) as R2_req: # R2请求
print(f"part {p_Id} R2 request at {self.env.now}")
yield R2_req # 等待R2可用
print(f"part {p_Id} R2 seized at {self.env.now}")
print("R2 moves to Fixture 01 to work on part ", p_Id, " at ", self.env.now, sep="")
yield self.env.timeout(g.sWeld_t*11) # R2进行11次点焊
print(f"part {p_Id} R2 released at {self.env.now}") # R2在此处自动释放
### 请求R1 ###
R1 = self.R1.request(priority=p_Id) # 手动请求R1
print(f"part {p_Id} R1 request at {self.env.now}")
yield R1 # 等待R1可用
print(f"part {p_Id} R1 seized at {self.env.now}")
yield self.env.timeout(8) # R1将零件移出Fix01
print("Part ", p_Id, " is leaving Fixture 01 at ", self.env.now, sep="")
### 释放Fix01 ###
yield self.Fix01.release(Fix01) # Fix01被清空,释放
print(f"part {p_Id} Fix01 released at {self.env.now}")
# ... (后续流程类似,手动管理OP, Fix101, R3, Fix02的请求和释放)通过这种方式,我们可以精确控制资源的生命周期,使其与实际的物理流程更加吻合。
cell_model的run方法负责启动SimPy仿真环境,并指定仿真运行的时间长度。
def run(self,t_sim=g.t_sim): # 运行方法启动实体生成器并告诉Simpy开始运行环境
self.env.run(until=t_sim)即使修正了资源释放逻辑,复杂的仿真模型仍然可能遇到“死锁”问题。死锁发生在两个或多个进程相互等待对方释放资源,从而导致所有相关进程都无法继续执行的情况。
例如,如果进程A持有资源X并等待资源Y,同时进程B持有资源Y并等待资源X,那么就会发生死锁。
在修正后的代码中,通过添加详细的print语句来记录每个零件在何时请求、获取和释放了哪个资源,这对于识别死锁非常有帮助。通过分析这些日志,可以追踪进程的执行路径,找出哪些资源被长时间持有,以及哪些进程在等待无法获得的资源。当仿真提前结束或某些进程不再有输出时,通常是死锁的迹象。
import simpy
import itertools
class g:#global constants
t_sim = 600 #24*60*60 #maximum simulation set to a day and 1 second
operator_qty = 2 #how many operators? (assume as interchangeable across any operator type process, helps us ask "what is the difference between two running it and one?")
sWeld_t = 2.75 #seconds per spot weld
Adh_t = 1/60 #seconds/mm to apply sealant or adhesive
ArcStud_t = 5.5 #seconds/drawn arc stud weld
ProjStud_t = 10 #seconds/projection stud weld
class cell_model: #represents entire factory cell in which all processes, resources and the modelling environment itself are held
def __init__(self):#initial conditions of cell
self.env = simpy.Environment() #create simpy environment
self._dict=dict()
self.part_counter = 0 #initialize @ part number zero
self.OP = simpy.PriorityResource(self.env,capacity=g.operator_qty)#implement resources, limited number of operators to do operator process, we assume operators can switch
#Declare all robots as resources, process cannot proceed unless proper resource is available
self.R1 = simpy.PriorityResource(self.env,capacity=1) #declare each robot as a resource (like a bin), each robot can only hold 1 part entity at a time, each robot can be in or out of a process (availability changes by request and release commands within a process)
self.R2 = simpy.PriorityResource(self.env,capacity=1)
self.R3 = simpy.PriorityResource(self.env,capacity=1)
self.R4 = simpy.PriorityResource(self.env,capacity=1)
#Declare all Fixture points as resources, a fixture can be interacted with as long as the robot is outside the relevant fixture process
self.Fix01 = simpy.PriorityResource(self.env,capacity=1)#a bin of capacity 01, holds one part at a time, becomes available once released by a process, becomes unavailable when requested by a process
self.Fix02 = simpy.PriorityResource(self.env,capacity=1)
self.Fix101 = simpy.PriorityResource(self.env,capacity=1)
self.env.process(self.part_generator())
def make_part(self,p_Id):#describe the set of processes one single part would experience going through the cell, we will then try to cram the parts throught this process as fast as possible
### OP request ###
OP = self.OP.request(priority=p_Id)
print(f"part {p_Id} OP request at {self.env.now}")
yield OP #request an operator and wait until one is available
print(f"part {p_Id} OP seized at {self.env.now}")
print("Part ", p_Id, " is next available at ", self.env.now, sep="")#report part in time
yield self.env.timeout(29)#operator walks part to tape bench, tapes part, takes part to fixture 01
### reqeust Fix01 ###
Fix01 = self.Fix01.request(priority=p_Id)
print(f"part {p_Id} Fix01 request at {self.env.now}")
yield Fix01 #while using the operator resource, request the Fixture, wait until it is avialable to proceed further
print(f"part {p_Id} Fix01 seized at {self.env.now}")
yield self.env.timeout(27) #operator loads parts, exits, hits start.
### op release ###
yield self.OP.release(OP) #done with operator, release him back to resource pool
print(f"part {p_Id} OP released at {self.env.now}")
print("Operator 01 released part ", p_Id, " at ", self.env.now, sep="")
yield self.env.timeout(0) #take a beat b4 next request
### request and release R2 ###
### R2 is released in the with block so we can keep this one
with self.R2.request(priority=p_Id) as R2_req:
print(f"part {p_Id} R2 request at {self.env.now}")
yield R2_req #I need R2 now, request to proceed
print(f"part {p_Id} R2 seized at {self.env.now}")以上就是SimPy离散事件仿真:工厂生产线建模与资源管理优化的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号