
本文介绍如何使用pandas直接读取psa文本文件,精准筛选以“prod”开头的行、提取第3列数值,并从文件名中正则提取门店编号,最终生成结构清晰的csv表格,全程无需中间临时文件。
在处理批量科学数据文件(如.psa格式)时,常见需求是:仅解析特定标识行(如prod开头)、提取固定位置字段(如第3个逗号分隔值),并关联元信息(如文件名中的门店编号)。传统做法常依赖先写入临时TXT/CSV再读取,不仅冗余,还易引入编码、换行或列对齐问题。更高效、健壮的方式是直接用Pandas流式解析原始文件,结合条件过滤与正则提取。
以下为完整可执行方案(支持单文件及批量处理):
✅ 核心思路
- pd.read_csv(..., usecols=[0, 2], header=None):跳过表头,仅加载第1列(索引0,用于判断prod)和第3列(索引2,目标数值),避免读取全部30+列,显著提升性能;
- df[df['type'] == 'prod']:布尔索引快速过滤,仅保留prod行;
- .drop(columns='type'):移除用于过滤的辅助列,保留纯净数值;
- re.search(r'store\s+(\d+)', filename).group(1):精准捕获文件名中store后紧跟的数字(如store 15 → 15),鲁棒性强,兼容空格/大小写变体;
- .assign(store=store):新增store列并广播赋值,简洁高效。
✅ 单文件处理示例代码
import pandas as pd
import re
# 指定输入文件路径
fname = '1 Area 2 - store 15 group.psa'
# 直接读取第1列(type)和第3列(num),无表头
df = pd.read_csv(fname, usecols=[0, 2], header=None, names=['type', 'num'])
# 提取文件名中的门店编号(支持 "store 15", "store15", "STORE 15" 等)
store_match = re.search(r'[sS][tT][oO][rR][eE]\s*(\d+)', fname)
if not store_match:
raise ValueError(f"无法从文件名 '{fname}' 中提取门店编号,请检查命名格式")
store_num = store_match.group(1)
# 过滤、清理、添加门店列
result_df = (df[df['type'] == 'prod']
.drop(columns='type')
.assign(store=store_num))
# 输出为CSV(列名为 num,store)
result_df.to_csv("output.csv", index=False)
print("✅ 处理完成!输出已保存至 output.csv")✅ 批量处理 ZIP 内所有 PSA 文件
若需遍历ZIP包内全部.psa文件,可结合zipfile与io.StringIO实现内存级读取(避免解压到磁盘):
import zipfile
import pandas as pd
import re
from io import StringIO
def process_psa_from_zip(zip_path, output_dir="output"):
import os
os.makedirs(output_dir, exist_ok=True)
all_results = []
with zipfile.ZipFile(zip_path, 'r') as z:
for file_info in z.filelist:
if file_info.filename.lower().endswith('.psa'):
# 从ZIP中读取文件内容(UTF-8编码,按需调整)
content = z.read(file_info.filename).decode('utf-8')
# 构造伪文件对象供pandas读取
df = pd.read_csv(StringIO(content), usecols=[0, 2], header=None, names=['type', 'num'])
# 提取门店号(从ZIP内文件名)
store_match = re.search(r'[sS][tT][oO][rR][eE]\s*(\d+)', file_info.filename)
store_num = store_match.group(1) if store_match else "unknown"
# 过滤并附加门店列
result = (df[df['type']=='prod']
.drop(columns='type')
.assign(store=store_num))
result['source_file'] = file_info.filename # 可选:记录来源
all_results.append(result)
# 合并所有结果并导出
if all_results:
final_df = pd.concat(all_results, ignore_index=True)
final_df.to_csv(f"{output_dir}/combined_output.csv", index=False)
print(f"✅ 批量处理完成!共 {len(all_results)} 个文件,结果已合并至 {output_dir}/combined_output.csv")
else:
print("⚠️ ZIP中未找到PSA文件")
# 调用示例
# process_psa_from_zip("data.zip")⚠️ 注意事项
- 编码兼容性:若PSA文件含中文或特殊字符,z.read(...).decode() 中需指定正确编码(如gbk、latin-1),建议先用chardet库探测;
- 列索引校验:PSA文件若存在空行或不规范分隔符,usecols=[0,2]可能报错,可添加on_bad_lines='skip'(Pandas ≥1.4);
- 正则健壮性:[sS][tT][oO][rR][eE]确保大小写不敏感;\s*(\d+)容忍store15或store 15等格式;
- 性能优化:对超大文件,可改用chunksize参数分块处理,避免内存溢出。
通过以上方法,您不仅能精准提取目标数据,还能将文件元信息无缝融入分析流程,真正实现“一行代码驱动多文件自动化处理”。










