(빅)데이터를 앞서 RDD로 추상화한 후에 다양한 함수형 프로그래밍 기법을 사용해서 데이터 변환연산 작업을 수행하였다. 목적이 기계학습 예측모형을 개발하는 경우 RDD 데이터보다는 이를 추상화한 데이터프레임으로 작업하는 것이 수월한 경우가 많다. 크게 스파크 데이터프레임 동사를 학습하여 이를 적용하는 방식과 SQL 구문을 작성하여 작업하는 두가지 방식이 많이 사용된다.
R마크다운에서 pyspark
를 사용할 수 있도록 reticulate
팩키지 환경을 설정한다.
생성된 스파크에 접속할 수 있도록 변수를 생성시킨다.
튜플 리스트를 spark.createDataFrame
함수로 스파크 데이터프레임을 생성시킨다.
# 튜플 리스트
party_list = [('민주당',1), ('바른미래',2), ('자유한국',3), ('민주당',30), ('바른미래',20), ('자유한국',10)]
# 튜플리스트에서 RDD생성
party_rdd = sc.parallelize(party_list)
# 스파크 데이터프레임 생성
# party_df = spark.createDataFrame(party_rdd, schema=['정당', '득표수'])
# 자료형 확인
# print("party_df 자료형: ", type(party_df))
# party_df.show()
.csv
파일을 스파크 데이터프레임으로 변환iris.csv
파일을 스파크 데이터프레임으로 변환시킨다. 이를 위해서 .read.csv()
함수를 사용하고 .printSchema()
메쏘드로 데이터프레임 변수별 자료형을 확인할 수 있다.
# Create an DataFrame from people.csv file
iris_df = spark.read.csv("data/iris.csv", header=True, inferSchema=True)
# Print the schema of the DataFrame
iris_df.printSchema()
root
|-- sepal_length: double (nullable = true)
|-- sepal_width: double (nullable = true)
|-- petal_length: double (nullable = true)
|-- petal_width: double (nullable = true)
|-- species: string (nullable = true)
외부 데이터를 가져온 후에 다양한 메쏘드를 활용하여 데이터프레임 변환작업을 수행한다.
# 데이터프레임 일반현황
print("데이터프레임 행수: {}".format(iris_df.count()), "\n데이터프레임 열수: {}".format(len(iris_df.columns)), "\n변수명: ", iris_df.columns)
# 데이터프레임 변수 및 행 필터링
데이터프레임 행수: 150
데이터프레임 열수: 5
변수명: ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species']
iris_subset_df = iris_df.select('sepal_length', 'petal_length', 'species').filter(iris_df.species =="setosa")
# 첫 관측점 10개만 추출
iris_subset_df.show(10)
+------------+------------+-------+
|sepal_length|petal_length|species|
+------------+------------+-------+
| 5.1| 1.4| setosa|
| 4.9| 1.4| setosa|
| 4.7| 1.3| setosa|
| 4.6| 1.5| setosa|
| 5.0| 1.4| setosa|
| 5.4| 1.7| setosa|
| 4.6| 1.4| setosa|
| 5.0| 1.5| setosa|
| 4.4| 1.4| setosa|
| 4.9| 1.5| setosa|
+------------+------------+-------+
only showing top 10 rows
외부 데이터를 가져온 후에 SQL 구문을 활용하여 데이터프레임에 대한 변환작업을 수행할 수 있다.
# iris 테이블 생성 - `iris_df` 스파크 데이터프레임을 콕 집음.
iris_df.createOrReplaceTempView("iris")
# 쿼리를 생성
iris_query = '''SELECT sepal_length, petal_length, species
FROM iris
WHERE species == "setosa"'''
# 쿼리를 실행시켜 스파크 데이터프레임 객체 신규 생성
iris_query_df = spark.sql(iris_query)
# 스파크 데이터프레임 결과 출력
iris_query_df.show(10)
+------------+------------+-------+
|sepal_length|petal_length|species|
+------------+------------+-------+
| 5.1| 1.4| setosa|
| 4.9| 1.4| setosa|
| 4.7| 1.3| setosa|
| 4.6| 1.5| setosa|
| 5.0| 1.4| setosa|
| 5.4| 1.7| setosa|
| 4.6| 1.4| setosa|
| 5.0| 1.5| setosa|
| 4.4| 1.4| setosa|
| 4.9| 1.5| setosa|
+------------+------------+-------+
only showing top 10 rows
시각화를 위해서 스파크 데이터프레임을 판다스 데이터프레임으로 변환시켜야 된다. 이를 위해서 .toPandas()
함수를 사용하고 matplotlib.pyplot
을 사용해서 시각화한다.
import matplotlib.pyplot as plt
iris_query_pd = iris_query_df.toPandas()
# 시각화
iris_query_pd.plot(kind='density')
plt.show()
qt platform plugin windows error
가 팝업창에 나오는 경우 pyqt5
를 설치함으로써 문제를 해결할 수 있다.