반전공자

[PySpark] MLlib 소개하기 - 데이터셋 로드(NCHS) 본문

데이터분석/PySpark

[PySpark] MLlib 소개하기 - 데이터셋 로드(NCHS)

하연01 2023. 3. 15. 23:13

토마스 드라마브, 데니 리의 "PySpark 배우기"를 보고 배워나가는 과정을 기록한 글입니다 ♪


PySpark의 MLlib 패키지를 이용해 실제로 분류 모델을 학습해보자! 

 

MLlib = Machine Learning Library

→ 스트리밍에 대해 학습 모델링을 지원하는 유일한 라이브러리 

 

 

다룰 내용

- MLlib으로 모델링하기 위해 데이터 준비

- 통계적 테스트 수행

- 로지스틱 회귀를 통해 유아 생존율 예측

- 가장 예측 가능한 feature 선택과 랜덤 포레스트 모델 학습 

 

 

 

패키지에 대한 개요

세 단계의 머신러닝 기능 

1. 데이터 전처리 : feature 추출, 변형, 선택, 카테고리 feature에 대한 해싱, 자연어 처리 기술

2. 머신 러닝 알고리즘 : 몇몇 유명하고 고급 레벨인 회귀, 분류, 군집화 알고리즘이 개발되어 있음

3. 유틸리티 : 기술 통계, chi-square 테스트, 선형대수, 모델 평가 방법론과 같은 통계적 방법론

 

 

→ 대부분의 주요 데이터 사이언트 업무 수행 가능! 

 

사용할 데이터 

- US 2014, 2015 출생 데이터의 일부(http://www.tomdrabas.com/data/LearningPySpark/births_train.csv.gz)

 

 

 

 

데이터 로딩하기와 변형하기 

- MLlib이 RDD, DStream에 중점을 두고 디자인되긴 했지만, 데이터 변형에 용이하도록 데이터프레임으로 변형하자

 

 

1. 데이터셋 스키마 명시 

labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.StringType()),
    ('BIRTH_YEAR', typ.IntegerType()),
    ('BIRTH_MONTH', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('MOTHER_RACE_6CODE', typ.StringType()),
    ('MOTHER_EDUCATION', typ.StringType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('FATHER_EDUCATION', typ.StringType()),
    ('MONTH_PRECARE_RECODE', typ.StringType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_BMI_RECODE', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.StringType()),
    ('DIABETES_GEST', typ.StringType()),
    ('HYP_TENS_PRE', typ.StringType()),
    ('HYP_TENS_GEST', typ.StringType()),
    ('PREV_BIRTH_PRETERM', typ.StringType()),
    ('NO_RISK', typ.StringType()),
    ('NO_INFECTIONS_REPORTED', typ.StringType()),
    ('LABOR_IND', typ.StringType()),
    ('LABOR_AUGM', typ.StringType()),
    ('STEROIDS', typ.StringType()),
    ('ANTIBIOTICS', typ.StringType()),
    ('ANESTHESIA', typ.StringType()),
    ('DELIV_METHOD_RECODE_COMB', typ.StringType()),
    ('ATTENDANT_BIRTH', typ.StringType()),
    ('APGAR_5', typ.IntegerType()),
    ('APGAR_5_RECODE', typ.StringType()),
    ('APGAR_10', typ.IntegerType()),
    ('APGAR_10_RECODE', typ.StringType()),
    ('INFANT_SEX', typ.StringType()),
    ('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()),
    ('INFANT_WEIGHT_GRAMS', typ.IntegerType()),
    ('INFANT_ASSIST_VENTI', typ.StringType()),
    ('INFANT_ASSIST_VENTI_6HRS', typ.StringType()),
    ('INFANT_NICU_ADMISSION', typ.StringType()),
    ('INFANT_SURFACANT', typ.StringType()),
    ('INFANT_ANTIBIOTICS', typ.StringType()),
    ('INFANT_SEIZURES', typ.StringType()),
    ('INFANT_NO_ABNORMALITIES', typ.StringType()),
    ('INFANT_ANCEPHALY', typ.StringType()),
    ('INFANT_MENINGOMYELOCELE', typ.StringType()),
    ('INFANT_LIMB_REDUCTION', typ.StringType()),
    ('INFANT_DOWN_SYNDROME', typ.StringType()),
    ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),
    ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()),
    ('INFANT_BREASTFED', typ.StringType())
]
schema = typ.StructType([
        typ.StructField(e[0], e[1], False) for e in labels
    ])

 

 

 

2. 데이터 로드 

 

births = spark.read.csv('/Users/hayeon/Downloads/births_train.csv.gz', header=True, schema=schema)

불러오고 바로 births.show() 를 해봤자 알아보기 힘든 데이터셋만 보인다. 

때문에 레코드 사전을 먼저 명시해주자 

 

 

 

2-1. 레코드 사전 명시 

 

recode_dictionary = {
    'YNU': {
        'Y': 1,
        'N': 0,
        'U': 0
    }
}

→ Yes를 1로 바꾼다는 의미의 RE 코드 

 

 

※ 이번 장의 목적?

    - INFANT_ALIVE_AT_REPORT(생존여부)가 1인지 0인지 예측 

    → 상관되는 모든 feature는 제거하고, 유아의 생존율은 오로지 어머니, 아버지, 출생지와 관련된 feature 기반으로 예측할 예정

 

 

 

2-2. feature 선택

 

selected_features = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_BEFORE', 
    'CIG_1_TRI', 
    'CIG_2_TRI', 
    'CIG_3_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'MOTHER_DELIVERY_WEIGHT', 
    'MOTHER_WEIGHT_GAIN', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]
births_trimmed = births.select(selected_features)

》 Yes = 1, No/Unknown = 0

※ 어머니의 흡연량(CIG ~)을 레코드할 때 문제? 

    - 흡연량 = 0 : 임신기간 동안 흡연하지 않음

    - 1 < 흡연량 < 97 : 1 ~ 97개 사이 

    - 흡연량 = 98 : 98개 이상

    - 흡연량 = 99 : Unknown이지만, 0으로 레코드 됨 

 

 

 

2-3. 레코드 함수 정의

import pyspark.sql.functions as func

def recode(col, key):        
    return recode_dictionary[key][col] 

def correct_cig(feat):
    return func \
        .when(func.col(feat) != 99, func.col(feat))\
        .otherwise(0)

rec_integer = func.udf(recode, typ.IntegerType())

recode 함수 

    - recode_dictionary로부터 주어진 키 값에 대한 올바른 값을 찾고 올바른 값 리턴

 

correct_cig 함수 

    - feature의 값(feat)이 99와 다르면 그 값을 리턴, 99일 때 0을 갖게 됨

 

※ recode 함수를 데이터프레임에 직접 사용 불가 

   스파크가 이해할 수 있는 UDF로 변환해야 함 → rec_integer 함수로 가능

 

rec_integer 함수 

    - 명시된 recode 함수를 전달하고 리턴 값의 데이터 타입을 명시 

 

≫ 이제 Yes / No / Unknown feature 인코드에 사용 가능! 

   

 

 

2-4. 흡연량 관련 피쳐 고치기

births_transformed = births_trimmed \
    .withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\
    .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\
    .withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\
    .withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))

※ withColumn()

   - 첫 번째 파라미터 : 컬럼명

   - 두 번째 파라미터 : 트랜스포메이션 

 

 

 

 

- Yes / No / Unknown 피쳐 번형

 

cols = [(col.name, col.dataType) for col in births_trimmed.schema]

YNU_cols = []

for i, s in enumerate(cols):
    if s[1] == typ.StringType():
        dis = births.select(s[0]) \
            .distinct() \
            .rdd \
            .map(lambda row: row[0]) \
            .collect()

    if 'Y' in dis:
        YNU_cols.append(s[0])

→ 컬럼명과 그에 상응하는 데이터 타입을 갖는 튜플 리스트 생성! 

→ 루프를 돌아 모든 스트링 컬럼의 고유 값 계산 (= Y가 리스트 안에 있으면 해당 컬럼을 YNU_cols 리스트에 추가) 

 

 

※ 데이터프레임은 feature를 고르면서 동시에 feature를 큰 규모로 변형 가능함. 

births.select([
        'INFANT_NICU_ADMISSION', 
        rec_integer(
            'INFANT_NICU_ADMISSION', func.lit('YNU')
        ) \
        .alias('INFANT_NICU_ADMISSION_RECODE')]
     ).take(5)

→ INFANT_NICU_ADMISSION 컬럼을 고르고, 이 컬럼명을 rec_integer 함수에 전달

→ 새롭게 변형된 컬럼명을 INFANT_NICU_ADMISSION_RECORD라고 이름 붙임 

 

 

 

- YNU_cols를 한번에 변형하기 위해 트랜스포메이션 리스트 생성하자

exprs_YNU = [
    rec_integer(x, func.lit('YNU')).alias(x) 
    if x in YNU_cols 
    else x 
    for x in births_transformed.columns
]

births_transformed = births_transformed.select(exprs_YNU)
births_transformed.select(YNU_cols[-5:]).show(5)