本文详解如何正确实现一个支持标签同步裁剪的自定义 OutlierRemover,使其能无缝集成到 imblearn.pipeline.Pipeline(而非 sklearn.pipeline.Pipeline)中,解决因 X 与 y 样本数不一致导致的 ValueError。
本文详解如何正确实现一个支持标签同步裁剪的自定义 `outlierremover`,使其能无缝集成到 `imblearn.pipeline.pipeline`(而非 `sklearn.pipeline.pipeline`)中,解决因 `x` 与 `y` 样本数不一致导致的 valueerror。
在构建端到端的不平衡分类流水线(pipeline)时,常需组合采样器(如 RandomOverSampler)与数据清洗步骤(如异常值移除)。但若自定义 Transformer 同时修改 X 和 y(例如剔除含异常特征的样本及其对应标签),直接继承 sklearn 的 BaseEstimator + TransformerMixin 并实现 transform() 方法将导致严重问题——因为标准 sklearn.pipeline.Pipeline 仅向 transform() 传递 X,且不接收或处理返回的 y。这正是报错 ValueError: Found input variables with inconsistent numbers of samples: [310231, 363920] 的根源:X 被删减后变短,而 y 未同步更新,造成维度失配。
要解决此问题,必须遵循 imblearn 的接口规范:所有参与 imblearn.pipeline.Pipeline 的组件(包括自定义类)都应实现 resample(self, X, y) 方法,而非 transform()。imblearn 的 pipeline 会显式调用 resample() 并确保 X 与 y 同步处理。
以下是符合 imblearn 标准的 OutlierRemover 正确实现:
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
import pandas as pd
class OutlierRemover(BaseEstimator, TransformerMixin):
def __init__(self, columns=None, iqr_multiplier=1.5):
self.columns = columns or []
self.iqr_multiplier = iqr_multiplier
def fit(self, X, y=None):
# fit() 不执行实际计算,仅返回 self(符合 sklearn 惯例)
return self
def resample(self, X, y=None):
"""
imblearn 兼容的 resample 方法:同步过滤 X 和 y
返回 (X_resampled, y_resampled) 或仅 X_resampled(若 y 为 None)
"""
if not isinstance(X, pd.DataFrame):
X = pd.DataFrame(X)
# 初始化掩码:默认保留全部样本
mask = pd.Series([True] * len(X))
for col in self.columns:
if col not in X.columns:
continue
q25, q75 = np.percentile(X[col], 25), np.percentile(X[col], 75)
iqr = q75 - q25
cutoff = iqr * self.iqr_multiplier
lower, upper = q25 - cutoff, q75 + cutoff
# 更新掩码:仅保留非异常值样本
col_mask = (X[col] >= lower) & (X[col] <= upper)
mask = mask & col_mask
X_resampled = X[mask].reset_index(drop=True)
if y is not None:
y_resampled = pd.Series(y).iloc[mask].reset_index(drop=True)
return X_resampled, y_resampled
return X_resampled, None # 注意:imblearn 期望返回 tuple,即使 y 为 None✅ 关键要点说明:
- 必须定义 resample(self, X, y) 方法(非 transform),这是 imblearn 流水线识别可重采样组件的唯一依据;
- fit() 方法保持空实现(仅 return self),无需保存统计量(本例为无状态过滤);
- resample() 必须返回 (X_resampled, y_resampled) 元组;当 y=None 时,也应返回 (X_resampled, None) 以保证接口一致性;
- 使用布尔索引 mask 一次性过滤,避免循环 drop() 导致索引错乱;
- 显式调用 .reset_index(drop=True) 确保输出 DataFrame/Series 索引连续,防止下游模型出错。
随后,在流水线中使用 imblearn.pipeline.Pipeline(不是 sklearn.pipeline.Pipeline 或 make_pipeline):
from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import RandomOverSampler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import RandomizedSearchCV, StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
# 注意:此处必须用 imblearn.pipeline.Pipeline
pipeline = ImbPipeline([
('oversampler', RandomOverSampler(random_state=42)),
('outlier_remover', OutlierRemover(columns=['V14', 'V12', 'V10', 'V4', 'V11', 'V2'])),
('classifier', LogisticRegression(max_iter=200))
])
# 参数搜索需包裹在 pipeline 内部(推荐方式)
param_dist = {
'classifier__penalty': ['l1', 'l2'],
'classifier__C': [0.001, 0.01, 0.1, 1, 10, 100]
}
rand_search = RandomizedSearchCV(
pipeline, param_dist, n_iter=4, cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=42),
scoring='f1', n_jobs=-1, random_state=42
)
# 执行交叉验证(无需手动拆分循环)
rand_search.fit(Org_X_train, Org_y_train)
best_model = rand_search.best_estimator_
y_pred = best_model.predict(Org_X_test)
# 评估...⚠️ 重要注意事项:
- 切勿混用 sklearn.pipeline.Pipeline 与 imblearn 采样器/自定义 resample 组件,否则 resample() 方法不会被调用;
- make_pipeline 是 sklearn 工具函数,不支持 resample 接口,必须显式使用 ImbPipeline([...]);
- 若需在 resample() 中依赖训练数据分布(如基于 fit() 计算的阈值),应在 fit() 中计算并存储(如 self.lower_bounds_, self.upper_bounds_),再于 resample() 中复用;
- 异常值过滤本质是数据清洗,通常应在采样之后执行(如本例),以避免对人工合成样本误判;若需在采样前过滤,需调整 pipeline 顺序。
通过严格遵循 imblearn 的 resample 协议,即可构建鲁棒、可复用、与采样器无缝协同的自定义数据预处理器,彻底规避样本数不一致的运行时错误。










