.csv
→ 로컬 스파크 - sparklyr
.csv
파일 → 로컬 스파크AWS S3 저장소에 데이터를 저장해서 활용하고, 스파크 클러스터를 별도 EC2 인스턴스로 묶어 데이터를 분석하는 것이 많이 활용되는 패턴 중 하다. 특히 S3를 s3fs
로 EC2와 동기화(sync)한 경우 마치 로컬 파일처럼 접근해서 데이터를 분석하는 것도 가능하다.
그전에 로컬 컴퓨터에 저장된 .csv
파일을 스파크 로컬 클러스터로 불러와서 데이터 분석작업을 수행하는 가장 간단한 사례를 상정해보자.
로컬 컴퓨터에 저장된 .csv
파일을 스파크 클러스터로 가져와서 분석을 진행하고자 할 때 먼저 로컬 컴퓨터 .csv
파일을 준비하고 스파크 클러스터를 로컬에 생성시킨다.
# 1. 라이브러리
library(sparklyr)
library(tidyverse)
## 설치된 SPARK 버젼확인 설정
# Sys.getenv("SPARK_HOME")
# spark_home_dir()
# spark_installed_versions()
# 2. 스파크 클러스터 연결
sc <- spark_connect(master="local")
## 2.1. 스파크 버젼 확인
spark_version(sc=sc)
[1] '2.3.0'
.csv
→ 로컬 스파크 클러스터 불러오기스파크 클러스터가 별 문제 없이 생성된 경우 다음 작업으로 sparklyr
팩키지 spark_read_csv()
함수를 사용해서 데이터를 가져온다.
캐글, “2015 Flight Delays and Cancellations - Which airline should you fly on to avoid significant delays?” 사이트에서 flights.csv
파일을 다운로드 받는다. flights.csv
파일 크기는 579MB 정도 크기를 갖는다.
# 4. 로컬 CSV 파일 불러오기 -------------------------
flights <- spark_read_csv(sc, "flights_spark",
path = "data/flights.csv",
memory = TRUE,
infer_schema = TRUE)
flights
# Source: spark<flights_spark> [?? x 31]
YEAR MONTH DAY DAY_OF_WEEK AIRLINE FLIGHT_NUMBER TAIL_NUMBER
<int> <int> <int> <int> <chr> <int> <chr>
1 2015 1 1 4 AS 98 N407AS
2 2015 1 1 4 AA 2336 N3KUAA
3 2015 1 1 4 US 840 N171US
4 2015 1 1 4 AA 258 N3HYAA
5 2015 1 1 4 AS 135 N527AS
6 2015 1 1 4 DL 806 N3730B
7 2015 1 1 4 NK 612 N635NK
8 2015 1 1 4 US 2013 N584UW
9 2015 1 1 4 AA 1112 N3LAAA
10 2015 1 1 4 DL 1173 N826DN
# ... with more rows, and 24 more variables: ORIGIN_AIRPORT <chr>,
# DESTINATION_AIRPORT <chr>, SCHEDULED_DEPARTURE <int>,
# DEPARTURE_TIME <int>, DEPARTURE_DELAY <int>, TAXI_OUT <int>,
# WHEELS_OFF <int>, SCHEDULED_TIME <int>, ELAPSED_TIME <int>,
# AIR_TIME <int>, DISTANCE <int>, WHEELS_ON <int>, TAXI_IN <int>,
# SCHEDULED_ARRIVAL <int>, ARRIVAL_TIME <int>, ARRIVAL_DELAY <int>,
# DIVERTED <int>, CANCELLED <int>, CANCELLATION_REASON <chr>,
# AIR_SYSTEM_DELAY <int>, SECURITY_DELAY <int>, AIRLINE_DELAY <int>,
# LATE_AIRCRAFT_DELAY <int>, WEATHER_DELAY <int>
분석에 필요한 필드를 추출해서 sdf_register()
함수로 스파크 데이터프레임으로 만든다. tbl_cache()
함수로 캐쉬에 등록해서 빠른 데이터 분석 및 모형 개발이 되도록 조치를 취한다.
tidy_flights <- tbl(sc, "flights_spark") %>%
mutate(ARRIVAL_DELAY = as.integer(ARRIVAL_DELAY),
DEPARTURE_DELAY = as.integer(DEPARTURE_DELAY),
DISTANCE = as.integer(DISTANCE)) %>%
filter(!is.na(ARRIVAL_DELAY)) %>%
select(ARRIVAL_DELAY, DEPARTURE_DELAY, DISTANCE, AIRLINE ) %>%
sdf_register("tidy_spark")
tbl_cache(sc, "tidy_spark")
tally
함수를 통해서 전체 행수를 산출해 본다. 그리고 나서, count()
등을 통해서 본격적인 탐색적 데이터 분석작업에 들어간다.
# Source: spark<?> [?? x 1]
n
<dbl>
1 5714008
# Source: spark<?> [?? x 2]
# Ordered by: desc(n)
AIRLINE n
<chr> <dbl>
1 WN 1242403
2 DL 870275
3 AA 712935
4 OO 576814
5 EV 554752
6 UA 507762
7 MQ 278791
8 B6 262042
9 US 194223
10 AS 171439
# ... with more rows
스파크에 내장된 기계학습 알고리즘을 통해서 예측 모형 구축 작업에 들어간다.
## 회귀분석
simple_model <- tidy_flights %>%
ml_linear_regression(DEPARTURE_DELAY ~ ARRIVAL_DELAY + DISTANCE)
summary(simple_model)
Deviance Residuals (approximate):
Min 1Q Median 3Q Max
-135.9491 -5.6981 0.6724 6.6215 184.1516
Coefficients:
(Intercept) ARRIVAL_DELAY DISTANCE
2.979332642 0.888530551 0.002910648
R-Squared: 0.8947
Root Mean Squared Error: 11.97
NULL