일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- pyspark
- 태블로
- ionehotencoding
- 데이터 분석
- 파이썬
- Python
- selenium
- 형태소분석기
- Udemy
- konlpy
- iNT
- pandas
- 데이터
- 데이터분석
- Tableau
- input
- Okt
- 머신러닝
- SQL
- 크롤링
- 시각화
- 인공지능
- scikit-learn
- Word Cloud
- numpy
- Today
- Total
반전공자
[PySpark] 데이터프레임 - API, SQL 본문
토마스 드라마브, 데니 리의 "PySpark 배우기"를 보고 배워나가는 과정을 기록한 글입니다 ♪
데이터프레임
- 관계형 DB의 테이블에서 칼럼 이름으로 구성된 변경 불가능한 분산 데이터 컬렉션
- 분산된 데이터 컬렉션에 구조체를 씌움으로써 스파크 사용자는 스파크 SQL로 구조적 데이터를 쿼리하거나 람다 대신 표현함수 사용 가능
- 데이터를 구조적으로 바꾸면서 스파크 엔진의 스파크 쿼리 성능이 크게 향상!
파이썬에서의 RDD 커뮤니케이션
- 모든 RDD 트랜스포메이션은 최초에 파이썬 RDD 자바 객체로 매핑된다.
- 작업들이 스파크 워커에 푸시됐을 떄, 파이썬 RDD 객체는 파이썬이 처리할 코드와 데이터를 보내기 위해 파이프로 파이썬 subprocess를 실행
- 정리하자면,,,
PySpark 드라이버 내에서 스파크 context는 Py4j를 이용해서 JavaSparkContext로 JVM을 실행, 받은 작업은 스파크 워커에 푸시되고, 파이프를 통해 파이썬으로 넘긴다.
* 위의 방법을 통해 PySpark는 여러 개의 워커 노드에 있는 여러 파이썬 subprocess로 데이터 처리 분배
* vs 파이썬과 JVM 간 많은 context switching과 커뮤니케이션 오버헤드 존재
카탈리스트 옵티마이저 리뷰
- 스파크 SQL 엔진이 빠른 주요 이유 = 카탈리스트 옵티마이저
- RDBMS의 비용 모델/비용 기반 최적화와 유사
카탈리스트 옵티마이저 기능
1. 논리적 플랜을 컴파일하고 최적화
2. 가장 효과적인 물리적 플랜을 결정 (비용 옵티마이저)
옵티마이저의 디자인 목적
1. 새로운 최적화 테크닉, 기능을 스파크 SQL에 쉽게 추가하기 위함
2. 외부 개발자들이 옵티마이저를 확장하기 편하게 하기 위함
ex) 데이터 소스 기반 룰을 추가 or 새로운 데이터 타입 지원 등 추가 개발을 쉽게 하려고 ...
데이터프레임을 이용한 파이스파크 스피드업
- 파이썬 쿼리는 RDD를 이용하는 스칼라 쿼리보다 두배 느림 → 파이썬과 JVM에서의 오버헤드로 인해 발생
- 데이터프레임을 파이썬과 함꼐 사용하면 파이썬 성능 크게 향상 & 파이썬, 스칼라, SQL, R 성능 일정해짐
* 파이썬 데이터프레임, SQL, 스칼라 데이터프레임, R 데이터프레임은 카탈리스트 옵티마이저 사용 가능
데이터프레임 생성
- SparkSession을 사용해 데이터 import하는 방식으로 생성
예제) 파일시스템 접근 말고 데이터 생성 먼저 해보자
→ stringJSONRDD로 RDD 먼저 생성 후 데이터프레임으로 변환
수영 선수 RDD(ID, 이름, 나이, 눈 색깔)를 JSON 포맷으로 생성!
1. JSON 데이터 생성하기
>>> stringJSONRDD = sc.parallelize((
... """{"id":"123", "name":"Katie", "age":19, "eyeColor":"brown"}""",
... """{"id":"234", "name":"Michael", "age":22, "eyeColor":"green"}""",
... """{"id":"345", "name":"Simone", "age":23, "eyeColor":"blue"}"""
... ))
→ RDD 생성 완료!
2. 데이터프레임 생성하기
swimmerJSON = spark.read.json(stringJSONRDD)
→ 아주 손쉽게 데이터프레임 생성 완료!
- 만든 RDD 객체를 read.json으로 불러오면 데이터프레임이 생성된다.
3. 임시 테이블 생성하기
swimmerJSON.createOrReplaceTempView("swimmerJSON")
- 많은 RDD 함수는 액션함수 실행 이전까지 실행되지 않는 트랜스포메이션이다.
ex) sc.parallelize는 spark.read.json을 사용해 RDD를 데이터프레임으로 변환할 떄 사용되는 트랜스포메이션
※ 스파크 job은 spark.read.json을 포함하는 위 셀 이전까진 실행되지 않음.
※ 임시 테이블 생성은 데이터프레임 트랜스포메이션이며, 데이터프레임 액션 실행 이전까지 실행되지 않는다.
예시로 다음 데이터프레임 쿼리를 살펴보도록 한다.
간단한 데이터프레임 쿼리
swimerJSON 데이터프레임을 생성했으니, 데이터프레임 API를 SQL 쿼리처럼 실행 가능하다.
데이터프레임 API 쿼리
- 데이터프레임 API 사용하기 위해 처음 n개의 행을 콘솔에 출력하는 show() 함수 사용
cf) show() 함수는 디폴트로 10줄 출력
>>> swimmerJSON.show()
SQL 쿼리
>>> spark.sql("select * from swimmerJSON").collect()
* collect() : 모든 데이터를 행 객체로 변환
* collect(), show() 함수는 데이터프레임과 SQL 쿼리에 대해 사용 가능
* collect() 함수는 데이터프레임의 모든 행 리턴 후 실행 노드에서 드라이버 노드로 이동하므로 작은 데이터프레임에 사용하자
→ 행 개수를 제한해서 take(n) or show(n) 사용하는걸 권장함.
[Row(age=19, eyeColor='brown', id='123', name='Katie'),
Row(age=22, eyeColor='green', id='234', name='Michael'),
Row(age=23, eyeColor='blue', id='345', name='Simone')]
RDD로 연동하기
* 기존의 RDD를 데이터프레임으로 변경하는 방법
1. reflection 사용해 스키마 추측 → 자세한 코드 작성 가능하게 함
2. 스키마를 직접 코드 상에 명시 → 열과 데이터 타입이 런타임에 드러날 때 데이터프레임의 구조 만들게 함
1. 리플렉션을 이용한 스키마 추측
- 스파크 SQL은 행 객체의 RDD를 데이터프레임으로 변경
→ 키 = column, 데이터 타입 = 데이터 샘플링으로 추측
- swimmerJSON 데이터프레임을 printSchema() 함수로 스키마 직접적 명시하지 않고 스키마 정의 확인하기
>>> swimmerJSON.printSchema()
root
|-- age: long (nullable = true)
|-- eyeColor: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
if, id가 원래는 Long 타입이 아니라 string 타입이기 때문에 스키마 명시하고 싶다면??
2. 프로그래밍하는 것처럼 스키마 명시
- 위 if 같은 경우에 스파크 SQL 데이터 타입을 가져와서 프로그래밍으로 스키마를 명시해주자
>>> from pyspark.sql.types import *
>>> stringCSVRDD = sc.parallelize([
... (123, 'Katie', 19, 'brown'),
... (234, 'Michael', 22, 'green'),
... (345, 'Simone', 23, 'blue')
- types를 임포트
- 콤마로 분리된 데이터 생성
→ 각 변수마다 스키마를 스트링으로 인코딩, 그 후 아래에서 스키마를 StructType과 StructField를 이용해 정의
>>> schema = StructType([
... StructField("id", LongType(), True),
... StructField("name", StringType(), True),
... StructField("age", LongType(), True),
... StructField("eyeColor", StringType(), True)
... ])
- StructField 클래스
* name : 필드 이름
* dataType : 필드의 데이터 타입
* nullable : 필드의 널 가능 여부
이제 생성한 schema를 stringCSVRDD에 적용하고, SQL을 이용해 쿼리할 수 있게끔 임시 뷰 생성하기
>>> swimmers = spark.createDataFrame(stringCSVRDD, schema)
→ RDD에 스키마 적용 후 데이터프레임 생성
>>> swimmers.createOrReplaceTempView("swimmers")
→ 데이터프레임을 이용해 임시 뷰 생성
>>> swimmers.printSchema()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- eyeColor: string (nullable = true)
→ 이전 스파크가 자체적으로 데이터 샘플링으로 데이터 타입을 추측했을 땐 id의 데이터 타입이 string 이었지만
자세하게 명시하고나니 long으로 데이터 타입이 바뀐 것을 확인할 수 있다.
데이터프레임 API로 쿼리
- collect(), show(), take() 함수로 데이터프레임의 데이터 확인 가능
- show(), take() 함수는 리턴되는 행 개수 제한하는 옵션 포함
행의 개수
- count()
>>> swimmers.count()
3
필터문 실행
- 아래 두 쿼리는 동일한 결과를 추출한다.
▶ age가 22인 데이터의 id와 age를 출력
>>> swimmers.select("id", "age").filter("age=22").show()
+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+
>>> swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age==22).show()
+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+
▶ 눈 색깔이 b로 시작하는 수영선수의 이름?
>>> swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()
+------+--------+
| name|eyeColor|
+------+--------+
| Katie| brown|
|Simone| blue|
+------+--------+
SQL로 쿼리
- 위에 사용된 데이터프레임으로 SQL 쿼리 실행하려 한다.
cf) .createOrReplaceTempView 함수 실행했으므로 데이터프레임 접근 가능!
행의 개수
>>> spark.sql("select count(1) from swimmers").show()
+--------+
|count(1)|
+--------+
| 3|
+--------+
필터문을 where 절을 사용해 실행
>>> spark.sql("select id, age from swimmers where age=22").show()
+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+
>>> spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()
+------+--------+
| name|eyeColor|
+------+--------+
| Katie| brown|
|Simone| blue|
+------+--------+
→ 데이터프레임 API로 쿼리에서 다룬 쿼리와 동일하며, 동일한 결과를 추출했다.
'데이터분석 > PySpark' 카테고리의 다른 글
[PySpark] 모델링 준비하기 - 아웃라이어 (0) | 2023.03.14 |
---|---|
[PySpark] 데이터프레임 시나리오 : 비행 기록 성능 (0) | 2023.03.11 |
[PySpark] 액션 - count(), saveAsTextFile(), foreach() (0) | 2023.03.10 |
[PySpark] 액션 - take(), collect(), reduce() (0) | 2023.03.10 |
[PySpark] 트랜스포메이션 - sample(), leftOuterJoin(), repartition() (0) | 2023.03.10 |