일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- 머신러닝
- 크롤링
- 형태소분석기
- numpy
- 인공지능
- scikit-learn
- ionehotencoding
- iNT
- Tableau
- pyspark
- 데이터분석
- Word Cloud
- 파이썬
- 태블로
- konlpy
- Udemy
- 시각화
- 데이터
- Python
- SQL
- selenium
- 데이터 분석
- pandas
- Okt
- input
- Today
- Total
반전공자
[PySpark] MLlib 소개하기 - 데이터셋 로드(NCHS) 본문
토마스 드라마브, 데니 리의 "PySpark 배우기"를 보고 배워나가는 과정을 기록한 글입니다 ♪
PySpark의 MLlib 패키지를 이용해 실제로 분류 모델을 학습해보자!
MLlib = Machine Learning Library
→ 스트리밍에 대해 학습 모델링을 지원하는 유일한 라이브러리
다룰 내용
- MLlib으로 모델링하기 위해 데이터 준비
- 통계적 테스트 수행
- 로지스틱 회귀를 통해 유아 생존율 예측
- 가장 예측 가능한 feature 선택과 랜덤 포레스트 모델 학습
패키지에 대한 개요
세 단계의 머신러닝 기능
1. 데이터 전처리 : feature 추출, 변형, 선택, 카테고리 feature에 대한 해싱, 자연어 처리 기술
2. 머신 러닝 알고리즘 : 몇몇 유명하고 고급 레벨인 회귀, 분류, 군집화 알고리즘이 개발되어 있음
3. 유틸리티 : 기술 통계, chi-square 테스트, 선형대수, 모델 평가 방법론과 같은 통계적 방법론
→ 대부분의 주요 데이터 사이언트 업무 수행 가능!
사용할 데이터
- US 2014, 2015 출생 데이터의 일부(http://www.tomdrabas.com/data/LearningPySpark/births_train.csv.gz)
데이터 로딩하기와 변형하기
- MLlib이 RDD, DStream에 중점을 두고 디자인되긴 했지만, 데이터 변형에 용이하도록 데이터프레임으로 변형하자
1. 데이터셋 스키마 명시
labels = [
('INFANT_ALIVE_AT_REPORT', typ.StringType()),
('BIRTH_YEAR', typ.IntegerType()),
('BIRTH_MONTH', typ.IntegerType()),
('BIRTH_PLACE', typ.StringType()),
('MOTHER_AGE_YEARS', typ.IntegerType()),
('MOTHER_RACE_6CODE', typ.StringType()),
('MOTHER_EDUCATION', typ.StringType()),
('FATHER_COMBINED_AGE', typ.IntegerType()),
('FATHER_EDUCATION', typ.StringType()),
('MONTH_PRECARE_RECODE', typ.StringType()),
('CIG_BEFORE', typ.IntegerType()),
('CIG_1_TRI', typ.IntegerType()),
('CIG_2_TRI', typ.IntegerType()),
('CIG_3_TRI', typ.IntegerType()),
('MOTHER_HEIGHT_IN', typ.IntegerType()),
('MOTHER_BMI_RECODE', typ.IntegerType()),
('MOTHER_PRE_WEIGHT', typ.IntegerType()),
('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
('DIABETES_PRE', typ.StringType()),
('DIABETES_GEST', typ.StringType()),
('HYP_TENS_PRE', typ.StringType()),
('HYP_TENS_GEST', typ.StringType()),
('PREV_BIRTH_PRETERM', typ.StringType()),
('NO_RISK', typ.StringType()),
('NO_INFECTIONS_REPORTED', typ.StringType()),
('LABOR_IND', typ.StringType()),
('LABOR_AUGM', typ.StringType()),
('STEROIDS', typ.StringType()),
('ANTIBIOTICS', typ.StringType()),
('ANESTHESIA', typ.StringType()),
('DELIV_METHOD_RECODE_COMB', typ.StringType()),
('ATTENDANT_BIRTH', typ.StringType()),
('APGAR_5', typ.IntegerType()),
('APGAR_5_RECODE', typ.StringType()),
('APGAR_10', typ.IntegerType()),
('APGAR_10_RECODE', typ.StringType()),
('INFANT_SEX', typ.StringType()),
('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()),
('INFANT_WEIGHT_GRAMS', typ.IntegerType()),
('INFANT_ASSIST_VENTI', typ.StringType()),
('INFANT_ASSIST_VENTI_6HRS', typ.StringType()),
('INFANT_NICU_ADMISSION', typ.StringType()),
('INFANT_SURFACANT', typ.StringType()),
('INFANT_ANTIBIOTICS', typ.StringType()),
('INFANT_SEIZURES', typ.StringType()),
('INFANT_NO_ABNORMALITIES', typ.StringType()),
('INFANT_ANCEPHALY', typ.StringType()),
('INFANT_MENINGOMYELOCELE', typ.StringType()),
('INFANT_LIMB_REDUCTION', typ.StringType()),
('INFANT_DOWN_SYNDROME', typ.StringType()),
('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),
('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()),
('INFANT_BREASTFED', typ.StringType())
]
schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])
2. 데이터 로드
births = spark.read.csv('/Users/hayeon/Downloads/births_train.csv.gz', header=True, schema=schema)
불러오고 바로 births.show() 를 해봤자 알아보기 힘든 데이터셋만 보인다.
때문에 레코드 사전을 먼저 명시해주자
2-1. 레코드 사전 명시
recode_dictionary = {
'YNU': {
'Y': 1,
'N': 0,
'U': 0
}
}
→ Yes를 1로 바꾼다는 의미의 RE 코드
※ 이번 장의 목적?
- INFANT_ALIVE_AT_REPORT(생존여부)가 1인지 0인지 예측
→ 상관되는 모든 feature는 제거하고, 유아의 생존율은 오로지 어머니, 아버지, 출생지와 관련된 feature 기반으로 예측할 예정
2-2. feature 선택
selected_features = [
'INFANT_ALIVE_AT_REPORT',
'BIRTH_PLACE',
'MOTHER_AGE_YEARS',
'FATHER_COMBINED_AGE',
'CIG_BEFORE',
'CIG_1_TRI',
'CIG_2_TRI',
'CIG_3_TRI',
'MOTHER_HEIGHT_IN',
'MOTHER_PRE_WEIGHT',
'MOTHER_DELIVERY_WEIGHT',
'MOTHER_WEIGHT_GAIN',
'DIABETES_PRE',
'DIABETES_GEST',
'HYP_TENS_PRE',
'HYP_TENS_GEST',
'PREV_BIRTH_PRETERM'
]
births_trimmed = births.select(selected_features)
》 Yes = 1, No/Unknown = 0
※ 어머니의 흡연량(CIG ~)을 레코드할 때 문제?
- 흡연량 = 0 : 임신기간 동안 흡연하지 않음
- 1 < 흡연량 < 97 : 1 ~ 97개 사이
- 흡연량 = 98 : 98개 이상
- 흡연량 = 99 : Unknown이지만, 0으로 레코드 됨
2-3. 레코드 함수 정의
import pyspark.sql.functions as func
def recode(col, key):
return recode_dictionary[key][col]
def correct_cig(feat):
return func \
.when(func.col(feat) != 99, func.col(feat))\
.otherwise(0)
rec_integer = func.udf(recode, typ.IntegerType())
※ recode 함수
- recode_dictionary로부터 주어진 키 값에 대한 올바른 값을 찾고 올바른 값 리턴
※ correct_cig 함수
- feature의 값(feat)이 99와 다르면 그 값을 리턴, 99일 때 0을 갖게 됨
※ recode 함수를 데이터프레임에 직접 사용 불가
스파크가 이해할 수 있는 UDF로 변환해야 함 → rec_integer 함수로 가능
※ rec_integer 함수
- 명시된 recode 함수를 전달하고 리턴 값의 데이터 타입을 명시
≫ 이제 Yes / No / Unknown feature 인코드에 사용 가능!
2-4. 흡연량 관련 피쳐 고치기
births_transformed = births_trimmed \
.withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\
.withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\
.withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\
.withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))
※ withColumn()
- 첫 번째 파라미터 : 컬럼명
- 두 번째 파라미터 : 트랜스포메이션
- Yes / No / Unknown 피쳐 번형
cols = [(col.name, col.dataType) for col in births_trimmed.schema]
YNU_cols = []
for i, s in enumerate(cols):
if s[1] == typ.StringType():
dis = births.select(s[0]) \
.distinct() \
.rdd \
.map(lambda row: row[0]) \
.collect()
if 'Y' in dis:
YNU_cols.append(s[0])
→ 컬럼명과 그에 상응하는 데이터 타입을 갖는 튜플 리스트 생성!
→ 루프를 돌아 모든 스트링 컬럼의 고유 값 계산 (= Y가 리스트 안에 있으면 해당 컬럼을 YNU_cols 리스트에 추가)
※ 데이터프레임은 feature를 고르면서 동시에 feature를 큰 규모로 변형 가능함.
births.select([
'INFANT_NICU_ADMISSION',
rec_integer(
'INFANT_NICU_ADMISSION', func.lit('YNU')
) \
.alias('INFANT_NICU_ADMISSION_RECODE')]
).take(5)
→ INFANT_NICU_ADMISSION 컬럼을 고르고, 이 컬럼명을 rec_integer 함수에 전달
→ 새롭게 변형된 컬럼명을 INFANT_NICU_ADMISSION_RECORD라고 이름 붙임
- YNU_cols를 한번에 변형하기 위해 트랜스포메이션 리스트 생성하자
exprs_YNU = [
rec_integer(x, func.lit('YNU')).alias(x)
if x in YNU_cols
else x
for x in births_transformed.columns
]
births_transformed = births_transformed.select(exprs_YNU)
births_transformed.select(YNU_cols[-5:]).show(5)
'데이터분석 > PySpark' 카테고리의 다른 글
[PySpark] 모델링 준비하기 - 기술통계, 상관계수 (0) | 2023.03.14 |
---|---|
[PySpark] 모델링 준비하기 - 아웃라이어 (0) | 2023.03.14 |
[PySpark] 데이터프레임 시나리오 : 비행 기록 성능 (0) | 2023.03.11 |
[PySpark] 데이터프레임 - API, SQL (0) | 2023.03.10 |
[PySpark] 액션 - count(), saveAsTextFile(), foreach() (0) | 2023.03.10 |