T'SPACE

다채로운 에디터들의 이야기

컴퓨터공학/인공지능

K-fold Cross Validation 심화편 (Data Leakage, Stratified)

Tonny Kang 2024. 4. 4. 11:35
반응형

https://tonnykang.tistory.com/137

 

k-fold cross-validation 교차 검증 (오버피팅 방지)

cf) 데이터train data : 학습을 통해 가중치, 편향 업데이트validation data : 하이퍼파라미터 조정, 모델의 성능 확인test data : 모델의 최종 테스트하이퍼파라미터 : 값에 따라서 모델의 성능에 영향을 주

tonnykang.tistory.com

위에서 알 수 있다시피

K-fold Cross-validation은 데이터 수가 적어 underfitting되는 상황을 방지해주고
더 일반화 된 모델을 만드는데 도움이 된다

그러나 문제점이 몇가지 있다

728x90

왜 머신러닝에서는 랜덤 샘플링을 선호하지 않을까?

이진 분류 문제를 예시로 들자

우리의 데이터셋은 샘플 100개로 구성되어있고

80개의 '0', 20개의 '1'로 이루어져 있다고 하자

 

Random Sampling:

Random Split을 8:2로 실행해 train_data, test_data로 나눈다면 모든 '0'들이 train에 들어가고 모든 '1'들이 test에 들어갈 수 있다

그럼 이러한 경우에는 돌려보지 않아도 안좋은 결과가 나올것을 알 수 있다

 

이와 상반되는 내용이 바로

 

Stratisfied Sampling:

Stratisfied Sampling을 진행하면 train에는 64개의 '0'과 16개의 '1' 그리고 test에는 16개의 '0'과 4개의 '1'로 split이 일어날 것이다 바로 sample data의 분포를 반영해 equal하게 train, test 데이터를 나누는 기법이다

 

sklearn에서 제공하는 library를 활용하면 python에서 쉽게 활용할 수 있다!

from sklearn.model_selection import StratifiedKFold

# Create StratifiedKFold object.
skf = StratifiedKFold(n_splits=10, shuffle=True, random_state=1)

for train_index, test_index in skf.split(feature, label):

 

하지만!

주의해야할 점이 이것 뿐만이 아니다

간혹 모델을 구현하다가 예측값의 정확도가 예상보다 너무 좋게 나와서 흐뭇했던 적이 있을것이다. (꽤나 ㅋㅋㅋㅋ)

 

Test Data, Validation Data, Train Data간의 가장 중요한 조건이 

서로 노출 되지 않는 것이다!

만약 노출이 되면 Data Leakage 라고 한다.

반응형

아니면 전처리과정이 너무 더러워져서 보기 싫고

코드를 공유하기 어려워 졌다면

활용해야할 기법있다

 

Pipeline

Pipeline은 당신의 머신러닝 코드의 여러 단계를 순차적으로 한번에 수행하게 해주는 도구이다

scikit-learn의 Pipeline 라이브러리는 내가 data에 가하고 싶은 transformation들을 값들로 지정하면된다

매우 간단한 pipeline의 예시는 아래와 같다

from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import LinearRegression

pipeline = Pipeline(
   steps=[("imputer", SimpleImputer()), 
          ("scaler", MinMaxScaler()), 
          ("regression", LinearRegression())
   ]
)

pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)

 

위에 pipeline을 확인해보면 steps에 첫번째

"imputer" 그후 "scaler", 그리고 모델 "Linear Regression"이 있다

 

그래서 밑에

pipeline.fit()을 하면 imputer를 통해 결측값들을 채우고

scaler를 통해 내가 원하는 범위로 scaling하고

Linear Regression 모델을 활용한다고 보면 된다

 

심지어 transformer들에 이름도 지어주기 싫다면

make_pipeline()을 활용하면 transformer의 class 이름을 바탕으로 알아서 이름을 지어준다

from sklearn.impute import SimpleImputer
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import LinearRegression

pipeline = make_pipeline(steps=[
    SimpleImputer(),
    MinMaxScaler(),
    LinearRegression()
    ]
)

 

내가 원하는 Feature들만 Transform 하기

 

2개 이상의 feature를 가지는 data를 가지고 있으면, 어지간해서 종류도 다를거다

연속적인 feature, 범주형 feature... 등등

그러면 전처리 과정과 transform과정을 다르게 접근하고 싶을 것이다 

이 또한 ColumnTransformer을 통해 가능하다

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder

categorical_transformer = ColumnTransformer(
    transformers=[("encode", OneHotEncoder())]
)

pipeline = Pipeline(steps=[
    ("categorical", categorical_transformer, ["col_name"])
    ]
)

 

여기서 ColumnTransformer에게 나머지 남는 열(feature)들 가지고는 어떡하고 싶은지 말해줄 수 있다.

예를 들어 remainder="passthrough"라고 설정해주면 전처리 과정을 거치지 않은 나머지 열들도 같이 Pipeline을 통과한다

임시적으로 이렇게 지시해주지 않으면 다 drop되기 때문에 말해줘야한다

 

또는 아래 코드 처럼 나머지 코드에게는 다른 transform 과정을 취하면 된다

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder

categorical_transformer = ColumnTransformer(
 transformers=[("encode", OneHotEncoder(), ["col_name"])], remainder="passthrough"
)

categorical_transformer = ColumnTransformer(
 transformers=[("encode", OneHotEncoder(), ["col_name"])], remainder=MinMaxScaler()
)
```

 

scikit-learn의 또 다른 기능으로는 pipeline stacking을 허용하기에

Pipeline에 pipeline을 전해줄 수 있다

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder

categorical_transformer = Pipeline(steps=[("encode", OneHotEncoder())])
numerical_transformer = Pipeline(
   steps=[("imputation", SimpleImputer()), ("scaling", MinMaxScaler())]
)

preprocessor = ColumnTransformer(
   transfomers=[
     ("numeric", numerical_transformer),
     ("categoric", categorical_transformer, ["col_name"]),
   ]
)

pipeline = Pipeline(steps=["preprocesssing", preprocessor])

 

아니면 FeatureUnion이라는 기능을 활용해

각 pipeline의 결과 feature들을 합쳐주는 기능을 한다

이를 통해 transformer fitting은 따로 해줄 수 있는 장점이 있다

 

예를 들어 우리가 moving average를 feature로 추가하고 싶으면 아래와 같이 가능하다

from sklearn.compose import FeatureUnion
from sklearn.pipeline import Pipeline

preprocessor = (
   FeatureUnion(
     [
       ("moving_Average", MovingAverage(window=30)),
       ("numerical", numerical_pipeline),
     ]
   ),
)

pipeline = Pipeline(steps=["preprocesssing", preprocessor])

 

Target Value, 출력 값의 변환

 

가끔은 출력 값도  regression을 하기 전에 transform 하는 경우가 있다

여기서 출력 값은, 우리가 예측 하겠다하는 값이다

이러한 Transformer은 TransformedTargetRegressor을 활용해 쓸 수 있다

 

이 library의 최고 장점은 예측값 출력시 다시 자동 원본 공간으로 mapping을 해주는 것이다!

예시를 보면 이해하기 더 쉬울 것이다

 

from sklearn.compose import TransformedTargetRegressor
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler

regressor = TransformedTargetRegressor(
    regressor=model, 
    func=np.log1p, 
    inverse_func=np.expm1
)

pipeline = Pipeline(
   steps=[
      ("imputer", SimpleImputer()), 
      ("scaler", MinMaxScaler()), 
      ("regressor", regressor)
    ]
)

pipeline.fit(X_train, y_train)

y_pred = pipeline.predict(X_test)

 

func=np.log1p를 살펴보면, data의 skewedness를 완화시켜주는 로그 함수를 활용하는데

log1p는  log(0)을 피하기 위해 1을 더해주는 함수를 transformer로 이용 한 것이다

이와 쌍으로

 

inverse_func=np.expm1을 보면 e^(x-1)함수를 이용하겠다는 것이며

다시 원상태로 돌려놓는 기능을 한다

 

그러나 가끔 내가 원하는 특별한 transformer가 필요할 때가 있다

 

Custom Functions

 

쉽게 내가 원하는 함수를 정의 할 수 있다

Class에 fit()과 transform() method들만 있으면 가능하다

심지어 필요 없으면 이 둘은 아무것도 하지 않아도 되고 또는

scikit-learn의 BaseEstimater과 TransformerMixin class에서 상속 받아도 된다

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler

class MovingAverage(BaseEstimator, TransformerMixin):

    def __init__(self, window=30):  
        self.window = window
    
    def fit(self, X, y=None):  
        return self
    
    def transform(self, X, y=None):
        return X.rolling(window=self.window, min_periods=1, center=False).mean()


pipeline = Pipeline(
   steps=[
       ("ma", MovingAverage(window=30)),
       ("imputer", SimpleImputer()),
       ("scaler", MinMaxScaler()),
       ("regressor", model),
   ]
)

pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)

 

Imputer이 뭔지 모른다면?

https://tonnykang.tistory.com/166

 

Scikit-learn, Imputer 결측값 처리기 (null values, nan)

결측 값은 AI 개발자들에게 매우 큰 골칫 거리이다 전처리의 기본 단계이며 결측 값들을 채우는 방법은 매우 많다 가능한 다른 Feature들과 관계를 찾아서 채우면 좋겠지만 불가능하거나 너무 복

tonnykang.tistory.com

Pipeline을 활용해

Data Leakage를 방지하며

K-fold Cross Validation을 어떻게 할 수 있을까?

 

예시를 위해 모의 데이터를 가져와보자

from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split

# prepare data
data = load_boston()
X, y = data['data'], data['target'] 
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

 

그 후 3개의 fold로 나눠보자

간단하죠?

from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import cross_val_score

# standardizaation
ss = StandardScaler()
X_train = ss.fit_transform(X_train)
X_test = ss.transform(X_test)

# evaluate the model using cross-validation
scores = cross_val_score(estimator=LinearRegression(), 
                         X=X_train, 
                         y=y_train, 
                         scoring='neg_mean_squared_error', 
                         cv=3)

 

하지만 이 몇줄의 코드들만해도 벌써 엄청난 오류를 범했습니다

 

정규화같은 변환들은 전체 data의 분포를 활용해 변환을 한다

 k-fold로 split되기 전에 이를 진행하면 train set이 validation set의 영향을 받음을 뜻한다

-> Data Leakage!!

 

진짜 뭐 같은게 뭐냐면

에러가 발생되지 않기 때문에 신경쓰지 않으면 일어나는지도 모를것이다 ㅋㅋㅋ

 

from sklearn.model_selection import GridSearchCV

# standardization
ss = StandardScaler()
X_train = ss.fit_transform(X_train)
X_test = ss.transform(X_test)

# perform grid search on training data
param_grid = {'fit_intercept':[True, False]}
gs = GridSearchCV(estimator=LinearRegression(),
                  param_grid=param_grid,
                 scoring='neg_mean_squared_error', 
                 cv=3, 
                 n_jobs=-1)

# perform grid search
gs.fit(X_train, y_train)

 

위와같은 grid-search 같은 경우에도 마찬가지다

 

해결책?

 

Pipeline!!

from sklearn.pipeline import Pipeline

# create pipeline
pipeline = Pipeline(steps=[('scaler', StandardScaler()),
                   ('model', LinearRegression())])

# perform cross validation
scores = cross_val_score(estimator=pipeline, 
                         X=X_train, 
                         y=y_train, 
                         scoring='neg_mean_squared_error', 
                         cv=3, 
                         n_jobs=-1)

 

이와같이 Pipeline을 활용해 앞선 코드를 수정해보면, Data Leakage 현상을 피할 수 있다

 

위에 보여준 grid-search와 같은 경우에도 pipeline을 활용하면 

머신러닝 알고리즘을 estimator에 파라미터로 주는 대신 pipeline object를 주는 것이다

# create pipeline
pipeline = Pipeline(steps=[('scaler', StandardScaler()),
                   ('model', LinearRegression())])

# create grid search object
param_grid = {'model__fit_intercept':[True, False]}
gs = GridSearchCV(estimator=pipeline,
                  param_grid=param_grid,
                 scoring='neg_mean_squared_error', 
                 cv=3, 
                 n_jobs=-1)

# perform grid search
gs.fit(X_train, y_train)

 

요약


 

K-fold Cross Validation 할꺼면 Stratified로

->Data Leakage 주의

 

fold로 split되기 전에 transform하면 전체 데이터의 분포를 사용하기 때문에 

->Data Leakage

 

그래서 split하고나서 transform 해야함

->Pipeline 활용

반응형