
在使用snakemake管理复杂生物信息学工作流时,尤其是在slurm等集群环境中,用户可能会遇到一个常见问题:当规则内部执行python代码(例如使用print()语句)时,其输出不会像外部shell命令那样实时显示在slurm的输出文件中,而是在脚本完成或失败后才一次性输出。这主要是因为python的stdout(标准输出)默认是带有缓冲的,尤其是在非交互式环境中。当snakemake将任务提交给slurm时,slurm会将任务的标准输出和标准错误重定向到作业的日志文件中,python的缓冲机制导致这些输出不会立即写入文件。
解决方案:刷新标准输出
要强制Python实时输出,可以通过显式刷新标准输出缓冲区。在Python脚本中,可以使用sys.stdout.flush()来达到此目的。
import sys
# ... 其他代码 ...
print("========RUNNING JOB SPLADDER=========")
sys.stdout.flush() # 立即刷新输出
print("
")
sys.stdout.flush() # 立即刷新输出
# ... 后续代码 ...然而,仅仅刷新输出只是解决了表面问题。更深层次地,原有的Snakemake规则设计存在一些可以优化的地方,这些优化不仅能提升实时输出的可观察性,更能显著提高工作流的并行效率、鲁棒性和可维护性,尤其是在集群环境下。
原始的spladder规则试图在一个Snakemake规则内部迭代处理多个基因组(genome),这与Snakemake的设计哲学相悖。Snakemake的核心优势在于其能够自动并行化处理独立的任务。一个理想的Snakemake规则应该针对单个输出目标定义,并利用通配符(wildcards)来泛化规则,让Snakemake引擎负责调度和并行执行。
立即学习“Python免费学习笔记(深入)”;
以下是原始规则中存在的问题及对应的优化策略:
规则粒度过大: 原始规则在一个run块中循环处理所有基因组,这意味着即使有多个CPU核心,Snakemake也只能将整个spladder任务作为一个单元提交给Slurm,无法实现基因组级别的并行。
输出不完整性: 如果某个基因组没有对应的rsa_ids,那么其对应的输出文件将不会被生成。这可能导致Snakemake在后续的清理或检查阶段报错,因为它期望所有声明的输出都能被创建。
Python run块与shell指令的滥用: 原始规则在run块中编写了大量Python逻辑来构建命令行字符串,这使得规则不易读,且失去了Snakemake shell指令的简洁性和鲁棒性。
列表推导式与expand函数: 在Snakemake中,expand函数是生成文件路径列表的推荐方式,它与通配符和输入/输出机制配合更佳。
以下是根据上述优化策略重构后的Snakemake工作流,以spladder任务为例:
import re
from pathlib import Path
import pandas as pd # 假设 accessions 是一个 pandas DataFrame
# 模拟 accessions DataFrame,实际应从文件加载
accessions_data = {
'genome_id': ['genomeA', 'genomeB', 'genomeA', 'genomeC'],
'rsa_id': ['rsa1', 'rsa2', 'rsa3', 'rsa4']
}
accessions = pd.DataFrame(accessions_data, index=['rsa1', 'rsa2', 'rsa3', 'rsa4'])
# 1. 定义最终目标规则 `all`
# 这个规则负责定义整个工作流的最终输出,并进行必要的预过滤
rule all:
"""
定义所有需要生成的spladder输出文件。
在构建目标列表时,我们预先过滤掉那些没有对应rsa_ids的基因组,
以避免Snakemake尝试为不可能生成的输出创建任务。
"""
input:
expand(
"data/spladder/{genome}/merge_graphs_mutex_exons_C3.pickle",
genome = [
genome_id
for genome_id in accessions['genome_id'].unique()
if len(accessions[accessions['genome_id'] == genome_id]) > 0
]
)
# 2. 定义辅助函数 `spladder_input`
# 这个函数根据通配符 `wildcards.genome` 动态查找并返回该基因组所需的所有输入文件
def spladder_input(wildcards):
"""
根据基因组通配符查找并返回spladder规则所需的输入文件。
包括基因组GTF文件和所有相关RSA ID的BAM文件。
"""
filtered_accessions = accessions[accessions['genome_id'] == wildcards.genome]
rsa_ids = filtered_accessions.index.values
# 确保每个基因组都有对应的BAM文件,如果没有则抛出错误或跳过
if len(rsa_ids) == 0:
raise ValueError(f"No rsa_ids found for genome: {wildcards.genome}")
return {
'genome_gtf': f"../ressources/genomes/{wildcards.genome}/genomic.gtf",
'bams': expand("data/alignments/{rsa}/{rsa}_Aligned.sortedByCoord.out.bam", rsa=rsa_ids),
}
# 3. 定义泛化的 `spladder` 规则
# 这个规则现在只负责处理单个基因组的spladder任务
rule spladder:
input:
# 使用unpack函数将spladder_input函数返回的字典解包为input关键字参数
unpack(spladder_input)
output:
# 针对单个基因组定义输出文件
"data/spladder/{genome}/merge_graphs_mutex_exons_C3.pickle"
threads: 20 # 考虑调整线程数,有时较少的线程和更多的作业更高效
resources:
mem_mb=1024*20,
runtime=60*8
params:
# 将BAM文件列表转换为逗号分隔的字符串,供shell命令使用
bams_str=lambda wildcards, input: ','.join(input.bams),
# 从输出路径中提取目录作为outdir参数
outdir=lambda wildcards, output: Path(output).parent
log:
"logs/spladder/{genome}.log" # 定义日志文件,方便Slurm模式下查看输出
shell:
"""
mkdir -p {params.outdir} && \
spladder build \
--set-mm-tag nM \
--bams {params.bams_str} \
--annotation {input.genome_gtf} \
--outdir {params.outdir} \
--parallel {threads} > {log} 2>&1
"""通过上述重构,我们不仅解决了Python脚本在Slurm模式下实时输出不显示的问题(通过日志重定向),更重要的是,将Snakemake工作流提升到了一个更高效、更健壮的层次:
注意事项:
遵循这些最佳实践,可以构建出高效、可靠且易于管理的Snakemake工作流,即使在复杂的集群环境中也能游刃有余。
以上就是Snakemake Slurm模式下Python脚本实时输出与规则优化实践的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号