findspark
팩키지 사용스파크 클러스터가 생성되면, 이에 접근할 수 있는 지점이 필요한데 이를 SparkContext
라고 부른다. 스파크 컨텍스트(Spark Context)를 통해 스파크 클러스터에 접근하여 필요한 명령어를 전달하고 실행결과를 전달받게 된다. 통상 줄여서 SparkContext
를 sc
변수로 지칭한다.
findspark
팩키지를 통해서 스파크를 찾아내고 pyspark.SparkContext
명령어로 스파크 접속지점을 특정한다. sc
변수를 통해 스파크 버젼, 파이썬 버전, 마스터 정보를 확인한다.
R 마크다운으로 문서작업을 하기 위해서 먼저 파이썬을 사용할 수 있도록 reticulate
를 활용하여 파이썬을 사용할 수 있도록 조치를 취한다.
SparkSession
사용SparkSession
을 사용하여 스파크 세션을 생성하고 .version
을 통해 버젼을 확인한다. spark.read.csv()
메쏘드를 사용해서 데이터를 불러 읽어들인다.
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master('local[*]')\
.appName('hello_world_app')\
.getOrCreate()
print(spark.version)
2.4.1
+------------+-----------+------------+-----------+-------+
|sepal.length|sepal.width|petal.length|petal.width|variety|
+------------+-----------+------------+-----------+-------+
| 5.1| 3.5| 1.4| .2| Setosa|
| 4.9| 3| 1.4| .2| Setosa|
| 4.7| 3.2| 1.3| .2| Setosa|
| 4.6| 3.1| 1.5| .2| Setosa|
| 5| 3.6| 1.4| .2| Setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows
None
스파크 클러스터로 데이터 분석을 위한 데이터를 가져오는 방식은 다양한다.
sc.parallelize
sc.parallelize()
함수로 파이썬 리스트를 스파크 클러스터로 가져오는 코드는 다음과 같다. 파이썬 리스트가 스파크 RDD
로 변환된 것을 확인할 수 있다.
sc.textFile
sc.textFile()
함수로 외부 텍스트 데이터를 스파크 클러스터로 가져오는 코드는 다음과 같다. iris.csv
파일이 스파크 RDD
로 변환된 것을 확인할 수 있다. iris_partition_rdd.getNumPartitions()
함수로 몇조각으로 파티션이 나뉘었는지도 확인할 수 있다.
spark.sql
정형데이터schema=
를 사용해서 앞서 StructType()
에서 지정한 스키마를 사용할 경우 속도를 대폭 향상시킬 수 있다. 이유는 inferSchema=
를 사용할 경우 그마큼 속도가 늦어지기 때문이다.
spark.read
에 .format('csv)
와 같이 좀더 유연하게 자료형을 불러올 수도 있다. 많이 사용되는 인자를 정리하면 다음과 같다.
nullValue='NA'
: NA
값 지정header=True
: 직사각형 데이터 칼럼명 지정schema=irisSchema
: 스키마 지정from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder\
.master('local[*]')\
.appName('hello_world_app')\
.getOrCreate()
irisSchema = StructType([
StructField('sepal.length', DoubleType(), True),
StructField('sepal.width', DoubleType(), True),
StructField('petal.length', DoubleType(), True),
StructField('petal.width', DoubleType(), True),
StructField('variety', StringType(), True) ])
iris = spark.read.csv('data/iris.csv', header=True, schema=irisSchema, nullValue='NA')
iris_format = spark.read.format('csv').load('data/iris.csv', header=True, schema=irisSchema, nullValue='NA')
print(iris.printSchema())
root
|-- sepal.length: double (nullable = true)
|-- sepal.width: double (nullable = true)
|-- petal.length: double (nullable = true)
|-- petal.width: double (nullable = true)
|-- variety: string (nullable = true)
None
root
|-- sepal.length: double (nullable = true)
|-- sepal.width: double (nullable = true)
|-- petal.length: double (nullable = true)
|-- petal.width: double (nullable = true)
|-- variety: string (nullable = true)
None
텍스트 리스트를 sc.parallelize()
함수로 데이터를 가져온다. 그리고 나서 type()
명령어로 자료형이 RDD라는 사실을 확인한다. sc.textFile()
함수로 외부 .csv
데이터를 가져올 경우 minPartitions
인자를 설정하여 원본 데이터, 즉 빅데이터를 몇조각으로 나눌지 지정할 수 있다.
# 단어 리스트를 바탕으로 RDD 객체 생성
list_rdd = sc.parallelize(["빅데이터는", "스파크로", "스몰 데이터는", "데이터프레임으로"])
# RDD 자료형 확인
print("RDD 자료형: ", type(list_rdd))
# 단어 리스트를 바탕으로 파티션 반영 RDD 객체 생성
iris_partition_rdd = sc.textFile("data/iris.csv", minPartitions=3)
# RDD 자료형 확인
print("RDD 자료형: ", type(iris_partition_rdd), "\n파티션 갯수:", iris_partition_rdd.getNumPartitions())