1 .csv 파일 → 로컬 스파크

AWS S3 저장소에 데이터를 저장해서 활용하고, 스파크 클러스터를 별도 EC2 인스턴스로 묶어 데이터를 분석하는 것이 많이 활용되는 패턴 중 하다. 특히 S3를 s3fs로 EC2와 동기화(sync)한 경우 마치 로컬 파일처럼 접근해서 데이터를 분석하는 것도 가능하다.

AWS CSV 파일

그전에 로컬 컴퓨터에 저장된 .csv 파일을 스파크 로컬 클러스터로 불러와서 데이터 분석작업을 수행하는 가장 간단한 사례를 상정해보자.

로컬 CSV 파일

2 로컬 스파크 클러스터 생성

로컬 컴퓨터에 저장된 .csv 파일을 스파크 클러스터로 가져와서 분석을 진행하고자 할 때 먼저 로컬 컴퓨터 .csv 파일을 준비하고 스파크 클러스터를 로컬에 생성시킨다.

[1] '2.3.0'

3 .csv → 로컬 스파크 클러스터 불러오기

스파크 클러스터가 별 문제 없이 생성된 경우 다음 작업으로 sparklyr 팩키지 spark_read_csv() 함수를 사용해서 데이터를 가져온다.

캐글, “2015 Flight Delays and Cancellations - Which airline should you fly on to avoid significant delays?” 사이트에서 flights.csv 파일을 다운로드 받는다. flights.csv 파일 크기는 579MB 정도 크기를 갖는다.

# Source: spark<flights_spark> [?? x 31]
    YEAR MONTH   DAY DAY_OF_WEEK AIRLINE FLIGHT_NUMBER TAIL_NUMBER
   <int> <int> <int>       <int> <chr>           <int> <chr>      
 1  2015     1     1           4 AS                 98 N407AS     
 2  2015     1     1           4 AA               2336 N3KUAA     
 3  2015     1     1           4 US                840 N171US     
 4  2015     1     1           4 AA                258 N3HYAA     
 5  2015     1     1           4 AS                135 N527AS     
 6  2015     1     1           4 DL                806 N3730B     
 7  2015     1     1           4 NK                612 N635NK     
 8  2015     1     1           4 US               2013 N584UW     
 9  2015     1     1           4 AA               1112 N3LAAA     
10  2015     1     1           4 DL               1173 N826DN     
# ... with more rows, and 24 more variables: ORIGIN_AIRPORT <chr>,
#   DESTINATION_AIRPORT <chr>, SCHEDULED_DEPARTURE <int>,
#   DEPARTURE_TIME <int>, DEPARTURE_DELAY <int>, TAXI_OUT <int>,
#   WHEELS_OFF <int>, SCHEDULED_TIME <int>, ELAPSED_TIME <int>,
#   AIR_TIME <int>, DISTANCE <int>, WHEELS_ON <int>, TAXI_IN <int>,
#   SCHEDULED_ARRIVAL <int>, ARRIVAL_TIME <int>, ARRIVAL_DELAY <int>,
#   DIVERTED <int>, CANCELLED <int>, CANCELLATION_REASON <chr>,
#   AIR_SYSTEM_DELAY <int>, SECURITY_DELAY <int>, AIRLINE_DELAY <int>,
#   LATE_AIRCRAFT_DELAY <int>, WEATHER_DELAY <int>

4 분석을 위한 기초작업

분석에 필요한 필드를 추출해서 sdf_register() 함수로 스파크 데이터프레임으로 만든다. tbl_cache() 함수로 캐쉬에 등록해서 빠른 데이터 분석 및 모형 개발이 되도록 조치를 취한다.

5 탐색적 데이터 분석

tally 함수를 통해서 전체 행수를 산출해 본다. 그리고 나서, count() 등을 통해서 본격적인 탐색적 데이터 분석작업에 들어간다.

# Source: spark<?> [?? x 1]
        n
    <dbl>
1 5714008
# Source:     spark<?> [?? x 2]
# Ordered by: desc(n)
   AIRLINE       n
   <chr>     <dbl>
 1 WN      1242403
 2 DL       870275
 3 AA       712935
 4 OO       576814
 5 EV       554752
 6 UA       507762
 7 MQ       278791
 8 B6       262042
 9 US       194223
10 AS       171439
# ... with more rows

6 회귀모형

스파크에 내장된 기계학습 알고리즘을 통해서 예측 모형 구축 작업에 들어간다.

Deviance Residuals (approximate):
      Min        1Q    Median        3Q       Max 
-135.9491   -5.6981    0.6724    6.6215  184.1516 

Coefficients:
  (Intercept) ARRIVAL_DELAY      DISTANCE 
  2.979332642   0.888530551   0.002910648 

R-Squared: 0.8947
Root Mean Squared Error: 11.97
NULL