1. 왜 병렬 데이터 처리인가?

병렬 컴퓨팅을 통해 알고리즘을 병렬적으로 처리하는 방법은 많이 소개되었지만, 데이터 분석에서 병렬 데이터 처리에 대한 내용은 생소할 수 있다. dplyr 팩키지의 group_by() 함수를 자주 사용하는 경우 어떻게 보면 병렬 데이터 처리에 가장 적합한 것으로 볼 수 있다.

multidplyr은 CPU 코어가 다수인 경우 데이터프레임을 칸막이로 분할(partition)하여 dplyr을 적용할 수 있게 개발된 팩키지다. 일단 Hadley Wikckham이 개발을 한 것으로 tidyverse 생태계의 일원이라면 병렬 데이터 처리 영역까지 분야를 넓힐 수 있다. 병렬 처리는 일단 partition() 함수로 데이터를 쪼개 개별 CPU 코어에서 작업을 병렬처리하고 나서 collect() 하여 수집하는 과정을 거쳐 병렬작업이 마무리 된다.

PARTOOLS: A SENSIBLE R PACKAGE FOR LARGE DATA SETS와 HP Vertica Analytics팀에서 개발한 DistributedR에서 영감을 받아 개발함.

1.1. multidplyr 팩키지 설치

devtools::install_github("hadley/multidplyr") 명령어를 통해 간단하게 multidplyr 팩키지를 설치할 수 있음.

2. 병렬 데이터 분석

multidplyr 팩키지를 설치하고 나면 dplyr을 활용한 것과 multidplyr 팩키지를 사용한 것이 무엇이 다른지 이해해야 한다.

2.1. 헬로우 월드 1

multidplyrdplyr을 그대로 사용할 수 있다. 다만, group_by() 대신에 partition()을 사용하는 것이 차이점이 된다. 앞에서 언급했듯이 partition() 명령어로 분할하여 병렬처리하고 나면 이를 다시 모으는 과정 즉 collect()가 필요하다.

두가지 처리과정 모두 동일한 결과가 산출되지만, multidplyr을 적용한 경우는 순서가 다를 수 있다. 왜냐하면 각 코어마다 작업을 나눠 처리하는 과정에서 빨리 작업을 처리한 코어가 가장 먼저 결과를 반환하고 해서 collect() 단계에서 차이가 날 수 있기 때문이다.

library(dplyr)  
library(multidplyr)

# 1. 기본 함수 -------------------------------------

## 1.1. dplyr
airquality %>% 
    group_by(Month) %>% 
    summarize(cnt = n())
# A tibble: 5 x 2
  Month   cnt
  <int> <int>
1     5    31
2     6    30
3     7    31
4     8    31
5     9    30
## 1.2. multidplyr
airquality %>% 
    partition(Month) %>% 
    summarize(cnt = n()) %>% 
    collect()  
# A tibble: 5 x 2
  Month   cnt
  <int> <int>
1     9    30
2     6    30
3     5    31
4     7    31
5     8    31

2.2. 클러스터 생성

create_cluster 명령어로 원하는 코어 갯수에 맞춰 클러스터를 구성한다. set_default_cluster 명령어로 클러스터 코어갯수를 디폴트 기본 설정한다.

dplyr 함수는 자체로 사용할 수 있지만, 사용자 정의 함수와 변수는 cluster_assign_value 함수를 통해 등록을 해줘야 클러스터에서 사용할 수 있다.

# 2. 클러스터 관리 -------------------------------------
## 2.1. 클러스터 생성
cluster <- create_cluster(3)
    
airquality %>% 
    partition(Month, cluster = cluster) %>% 
    summarize(cnt = n()) %>% 
    collect()
# A tibble: 5 x 2
  Month   cnt
  <int> <int>
1     5    31
2     7    31
3     8    31
4     6    30
5     9    30
# 디폴트 클러스터 설정
set_default_cluster(cluster)  
airquality %>% 
    partition(Month) %>% 
    summarize(cnt = n()) %>% 
    collect()  
# A tibble: 5 x 2
  Month   cnt
  <int> <int>
1     9    30
2     6    30
3     7    31
4     5    31
5     8    31
# 3. 사용자 정의 함수 -------------------------------------
four <- function(x) {  
    return(data.frame(a = 4))
}

one <- 1

# Register function and variable
cluster_assign_value(cluster, 'four', four)  
cluster_assign_value(cluster, 'one', one)  

cluster_get(cluster, 'one')  
[[1]]
[1] 1

[[2]]
[1] 1

[[3]]
[1] 1
cluster_rm(cluster, c('four', 'one'))  

cluster %>% cluster_ls()  
[[1]]
[1] "bcopkcxenm"    "exhpaecgcx"    "kizovxrwcc"    "lavfxmhdeu"   
[5] "theme_gogamza"

[[2]]
[1] "bcopkcxenm"    "exhpaecgcx"    "kizovxrwcc"    "lavfxmhdeu"   
[5] "theme_gogamza"

[[3]]
[1] "bcopkcxenm"    "exhpaecgcx"    "kizovxrwcc"    "lavfxmhdeu"   
[5] "theme_gogamza"

3. 코스피 200 주식데이터 가져오기 2

S&P500 주식을 rvestmultiplyr 팩키지를 활용한 사례를 참조하여 코스피 200(KOSPI 200) 주식 데이터를 가져오는 사례로 재구성했다.

3.1. 코스피 200 주식 데이터 정리

코스피 200 주식정보가 정리된 것이 없어 웹사이트에서 대한민국 거래되는 주식정보를 긁어오고, 코스피 200에 해당되는 기업정보도 나무위키에서 긁어온다. 그리고 나서 코스피 200에 포함된 기업정보는 161개로 확인된다.

# 0. 환경설정 --------------------------------------

# library(tidyverse)
# library(rvest)
# library(dplyr)
# library(multidplyr)
# library(magrittr)
# library(lubridate)
# library(parallel)
# library(rbenchmark)
# library(quantmod)

# 1. 데이터 가져오기 -----------------------------------------------------
## 1.1. 종목코드 ---------------------------------------------------------
kor_stock <- read_html("http://bigdata-trader.com/itemcodehelp.jsp") %>%
    html_table()

kospi <- kor_stock[[1]] %>% as_tibble() %>%
    dplyr::rename(symbol = X1,
                  company = X2,
                  type = X3) %>% 
    dplyr::filter(type=="KOSPI")

## 1.2. 코스피 200 ---------------------------------------------------------

kospi_lst <- read_html("https://namu.wiki/w/KOSPI200") %>%
    html_table(fill=TRUE)

kospi_200 <- kospi_lst[[1]] %>% as_tibble() %>% 
    dplyr::slice(-1) %>% 
    dplyr::select(1:5) %>% 
    gather(group, company) %>% 
    dplyr::select(company)

## 1.3. 코스피 200 코드와 병합 ---------------------------------------------------------

kospi_200_df <- inner_join(kospi, kospi_200, by="company") %>% 
    mutate(symbol = paste0(symbol, ":KOSPI"))

3.2. 주식데이터 가져오는 함수

최근에 야후 금융에서 데이터를 가져오는 것이 문제가 있어, src="google"로 지정한다. quantmod 팩키지 getSymbols() 함수의 기본 반환값은 xts 자료형이라 이를 tibble로 변환한다.

# 2. 주식가격 가져오는 함수 ---------------------------------------------------------

get_stock_prices <- function(symbol, return_format = "tibble", from=from, to=to) {
    # 구글에서 주식데이터 가져오기
    stock_prices <- tryCatch({
        getSymbols(Symbols = symbol, auto.assign = FALSE, src = "google",  from=from, to=to)
    }, error = function(e) {
        return(NA)
    })
    if (!is.na(stock_prices[[1]])) {
        # 변수명 재설정
        names(stock_prices) <- c("시가", "고가", "저가", "종가", "거래량")
        # 기본설정은 xts 파일형식을 tibble로 변환
        if (return_format == "tibble") {
            stock_prices <- stock_prices %>%
                as_tibble() %>%
                rownames_to_column(var = "Date") %>%
                mutate(Date = ymd(Date))
        }
        return(stock_prices)
    }
}

from <- "2017-05-01"
to   <- today()

getSymbols(Symbols = kospi_200_df[1,]$symbol, auto.assign = FALSE, src = "google",  from=from, to=to) %>% head()
           000120:KOSPI.Open 000120:KOSPI.High 000120:KOSPI.Low
2017-05-02           2213.61           2229.74          2212.87
2017-05-04           2224.91           2241.24          2224.91
2017-05-08           2245.61           2292.76          2244.23
2017-05-10           2294.10           2323.22          2264.31
2017-05-11           2278.47           2297.67          2271.66
2017-05-12           2296.06           2296.51          2283.38
           000120:KOSPI.Close 000120:KOSPI.Volume
2017-05-02            2219.67           295139000
2017-05-04            2241.24           261918000
2017-05-08            2292.76           281761000
2017-05-10            2270.12           422888000
2017-05-11            2296.37           463460000
2017-05-12            2286.02           335337000

3.3. 순차처리 방식으로 주식데이터 가져오기

순차처리방식으로 데이터를 가져오는 경우 내부에 map함수를 통해 함수형 프로그래밍 기법을 적용하여 데이터를 구글 금융에서 받아오지만 순차적으로 쭉 받아오게 된다.

# 3. 순차 처리 방식 ---------------------------------------------------------

get_stock_prices_from_google <- function(kospi_200_input, from, to){ 
    kospi_200_sequential_df <- kospi_200_input %>%
        mutate(
            stock.prices = map(.x = symbol, 
                               ~ get_stock_prices(symbol = .x,
                                                  return_format = "tibble",
                                                  from = from,
                                                  to   = to)
            )
        )
    return(kospi_200_sequential_df)
}

get_stock_prices_from_google(kospi_200_df[1,], from, to)
# A tibble: 1 x 4
        symbol    company  type      stock.prices
         <chr>      <chr> <chr>            <list>
1 000120:KOSPI CJ대한통운 KOSPI <tibble [15 x 6]>

3.4. 병렬처리 방식으로 주식데이터 가져오기

병렬처리 방식으로 데이터를 가져오기 위해서는 사전에 parallel::detectCores() 함수를 활용하여 코어 숫자를 확인하고 이를 create_cluster 함수로 클러스터를 생성시킬 때 반영한다.

kospi_200_df 데이터가 161개 행으로 구성되니 이를 코어숫자에 맞춰 그룹을 생성시킨다. partition 함수를 통해 그룹을 쪼갠다. 쪼개진 그룹은 병렬처리를 위해 각 코어에 매칭된다.

dplyr을 제외한 다른 팩키지와 사용자 정의 함수와 변수는 cluster_library, cluster_assign_value 함수로 등록시킨다.

cluster_eval 함수를 통해 클러스터에 설정된 상황을 확인할 수 있다.

# 4. 병렬 처리 방식 ---------------------------------------------------------
## 4.1. 클러스터 생성 -------------------------------------------------------
cl <- detectCores() -1
cluster <- create_cluster(cores = cl)

## 4.2. 병렬처리 그룹 생성 -------------------------------------------------------
group <- rep(1:cl, length.out = nrow(kospi_200_df))
kospi_200_df <- bind_cols(tibble(group), kospi_200_df)

by_group <- kospi_200_df %>%
    partition(group, cluster = cluster)

## 4.3. 클러스터에 dplyr을 제외한 팩키지, 함수, 변수 등록 -------------------------------------------------------
by_group %>%
    # 팩키지 등록
    cluster_library("tidyverse") %>%
    cluster_library("stringr") %>%
    cluster_library("lubridate") %>%
    cluster_library("quantmod") %>%
    # 사용자 정의 함수와 변수 등록
    cluster_assign_value("get_stock_prices", get_stock_prices) %>%
    cluster_assign_value("from", from) %>%
    cluster_assign_value("to", to)

# 첫번째 클러스터 설정상황 확인
cluster_eval(by_group, search())[[1]]
 [1] ".GlobalEnv"        "package:quantmod"  "package:TTR"      
 [4] "package:xts"       "package:zoo"       "package:lubridate"
 [7] "package:stringr"   "package:dplyr"     "package:purrr"    
[10] "package:readr"     "package:tidyr"     "package:tibble"   
[13] "package:tidyverse" "package:stats"     "package:graphics" 
[16] "package:grDevices" "package:utils"     "package:datasets" 
[19] "KoreaEnv"          "package:extrafont" "package:ggthemes" 
[22] "package:ggplot2"   "package:methods"   "Autoloads"        
[25] "package:base"     
cluster_get(by_group, "get_stock_prices")[[1]]
function(symbol, return_format = "tibble", from=from, to=to) {
    # 구글에서 주식데이터 가져오기
    stock_prices <- tryCatch({
        getSymbols(Symbols = symbol, auto.assign = FALSE, src = "google",  from=from, to=to)
    }, error = function(e) {
        return(NA)
    })
    if (!is.na(stock_prices[[1]])) {
        # 변수명 재설정
        names(stock_prices) <- c("시가", "고가", "저가", "종가", "거래량")
        # 기본설정은 xts 파일형식을 tibble로 변환
        if (return_format == "tibble") {
            stock_prices <- stock_prices %>%
                as_tibble() %>%
                rownames_to_column(var = "Date") %>%
                mutate(Date = ymd(Date))
        }
        return(stock_prices)
    }
}
## 4.4. 병렬처리 클러스터 확인 ---------------------------------------------------------

get_stock_prices_from_google <- function(kospi_200_input, from, to){ 
    
    kospi_200_parallel_df <- kospi_200_input %>%
        mutate(
            stock.prices = map(.x = symbol, 
                               ~ get_stock_prices(symbol = .x,
                                                  return_format = "tibble",
                                                  from = from,
                                                  to   = to)
            )
        ) %>%
        collect() %>%
        as_tibble() 
    return(kospi_200_parallel_df)
}

3.5. 순차처리와 병렬처리 성능 비교

저자 PC에서 돌린 성능과 노트북으로 돌린 성능을 비교하자. 저자 PC는 코어가 4개, 노트북은 코어가 8개가 된다. 성능이 코어숫자에 비례하여 증가하지만 코어 상호간에 커뮤니케이션 비용으로 인해 코어가 증가해도 100%로 성능향상으로 이어지는 것은 아니다.

system.time(get_stock_prices_from_google(kospi_200_df, from, to))
   user  system elapsed 
  2.017   0.217  76.995 
# 사용자  시스템 elapsed 
# 16.29    1.36   97.34 

system.time(get_stock_prices_from_google(by_group, from, to))
   user  system elapsed 
  0.007   0.001  10.644 
# 사용자  시스템 elapsed 
# 0.11    0.03   35.83 

  1. MULTIDPLYR - DPLYR MEETS PARALLEL PROCESSING

  2. SPEED UP YOUR CODE: PARALLEL PROCESSING WITH MULTIDPLYR