반전공자

[PySpark] 데이터프레임 시나리오 : 비행 기록 성능 본문

데이터분석/PySpark

[PySpark] 데이터프레임 시나리오 : 비행 기록 성능

하연01 2023. 3. 11. 00:02

토마스 드라마브, 데니 리의 "PySpark 배우기"를 보고 배워나가는 과정을 기록한 글입니다 ♪


비행기록 성능 유스케이스 

- 항공사의 지연율과 비행 지연의 원인에 대해 분석

- 비행 지연의 여러 변수를 확인하기 위해 공항 데이터셋과 조인

※ 비행 지연과 관련된 변수를 더 잘 이해하려는 목적! 

 

 

1. 출발지 데이터셋 준비하기

# 파일 경로 설정
>>> flightPerfFilePath = '/Users/hayeon/Downloads/departuredelays.csv'
>>> airportsFilePath = '/Users/hayeon/Downloads/airport-codes-na.txt'

# 공항 데이터셋 획득
>>> airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
>>> airports.createOrReplaceTempView("airports")

# 출발지 지연 데이터셋 획득
>>> flightPerf = spark.read.csv(flightPerfFilePath, header='true')
>>> flightPerf.createOrReplaceTempView("FlightPerformance")

◎ CSV 리더를 이용해 데이터를 임포트 (구분자 지정 가능)

 

# 출발지 지연 데이터셋 캐시
>>> flightPerf.cache()
DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

쿼리가 더 빠르게 수행될 수 있도록 비행 데이터셋 캐시

 

 

 

2. 비행 성능 데이터셋과 공항 데이터셋 조인

 

- 조인 자체가 성능에 부담이 되는 작업이지만, 데이터프레임 내에는 조인들에 대해 많은 최적화가 기본으로 포함되어 있음

★ 워싱턴 주의 전체 지연을 도시와 출발지 코드에 따라 쿼리 

      → IATA(International Air Transport Association) 코드를 기준으로 조인해야 함

>>> spark.sql("""
...     select a.City,
...     f.origin,
...     sum(f.delay) as Delays
...     from FlightPerformance f
...     join airports a
...     on a.IATA = f.origin
...     where a.State = 'WA'
...     group by a.City, f.origin
...     order by sum(f.delay) desc"""
... ).show()


+-------+------+--------+                                                       
|   City|origin|  Delays|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+

 

* 교재에서는 데이터브릭스 노트북에서 바 차트로 시각화했는데 난 시각화는 패스.. ㅠㅠ~