1 스파크 SQL 들어가며

pip install pyspark로 파이썬 스파크를 설치하게 되면 스파크세션을 만들어서 스파크 SQL로 들어갈 수 있는 여정을 시작할 수 있다. 스파크 Getting Started를 참조해서 스파크 세션을 생성한다.

그리고 나서, 그 유명한 붓꽃 데이터(iris.csv) 데이터를 로컬 컴퓨터에서 불러와서 스파크 데이터프레임으로 생성시킨다.

from pyspark.sql import *

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option") \
    .getOrCreate()
    
iris_df = spark.read.csv("data/iris.csv", inferSchema = True, header = True)

iris_df.show(5)
+------------+-----------+------------+-----------+-------+
|sepal.length|sepal.width|petal.length|petal.width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows

1.1 스파크 SQL

다음으로 데이터프레임에 SQL을 적용시킬 수 있는 객체를 별도로 만든다. 이때 스파크데이터프레임.createOrReplaceTempView("객체명") 메소드를 사용한다. spark.sql() 메쏘드를 사용해서 SQL 문을 던져 원하는 결과를 얻을 수 있다.

iris_df.createOrReplaceTempView("iris")

spark.sql("SELECT * FROM iris LIMIT 5").show()
+------------+-----------+------------+-----------+-------+
|sepal.length|sepal.width|petal.length|petal.width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
|         4.6|        3.1|         1.5|        0.2| Setosa|
|         5.0|        3.6|         1.4|        0.2| Setosa|
+------------+-----------+------------+-----------+-------+

1.2 스파크 SQL 스키마

테이블에 SQL 질의(Query)를 던지기 전에 가장 먼저 해야 되는 작업은 아마도 스카마(Schema) 구조를 파악하는 것이다. 이를 위해서 SQL DESCRIBE 명령어를 사용한다.

spark.sql("DESCRIBE iris").show()
+------------+---------+-------+
|    col_name|data_type|comment|
+------------+---------+-------+
|sepal.length|   double|   null|
| sepal.width|   double|   null|
|petal.length|   double|   null|
| petal.width|   double|   null|
|     variety|   string|   null|
+------------+---------+-------+

2 스파크 SQL 기본기 1

2.1 데이터 프레임 생성

createDataFrame() 메쏘드를 사용해서 스파크 데이터프레임을 작성한다. 그리고, 판다스 데이터프레임에서 스파크 데이터프레임도 생성이 가능하다. 앞써 spark.read_csv() 메쏘드, DataFrameReader를 사용해서 스파크 데이터프레임 생성하는 것도 가능하다.

df1 = spark.createDataFrame([(1, "andy", 20, "USA"), 
                             (2, "jeff", 23, "China"), 
                             (3, "james", 18, "USA")]).toDF("id", "name", "age", "country")

df1.printSchema
<bound method DataFrame.printSchema of DataFrame[id: bigint, name: string, age: bigint, country: string]>
df1.show()

# 판다스 데이터프레임에서 스파크 데이터프레임 생성
+---+-----+---+-------+
| id| name|age|country|
+---+-----+---+-------+
|  1| andy| 20|    USA|
|  2| jeff| 23|  China|
|  3|james| 18|    USA|
+---+-----+---+-------+
df2 = spark.createDataFrame(df1.toPandas())
df2.printSchema
<bound method DataFrame.printSchema of DataFrame[id: bigint, name: string, age: bigint, country: string]>
df2.show()
+---+-----+---+-------+
| id| name|age|country|
+---+-----+---+-------+
|  1| andy| 20|    USA|
|  2| jeff| 23|  China|
|  3|james| 18|    USA|
+---+-----+---+-------+

2.2 신규 필드 생성

dplyr 팩키지 mutate와 마찬가지로 신규 필드를 생성할 때는 withColumn() 메쏘드를 사용한다.

df2 = df1.withColumn("age2", df1["age"] + 1)
df2.show()
+---+-----+---+-------+----+
| id| name|age|country|age2|
+---+-----+---+-------+----+
|  1| andy| 20|    USA|  21|
|  2| jeff| 23|  China|  24|
|  3|james| 18|    USA|  19|
+---+-----+---+-------+----+

2.3 칼럼 추출 및 제거

dplyr 팩키지 select와 마찬가지로 원하는 변수 칼럼을 추출하고자 할 때는 select 메쏘드를 사용한다. 칼럼을 제거하고자 하는 경우 drop을 사용한다.

df2 = df1.select("id", "name")
df2.show()
+---+-----+
| id| name|
+---+-----+
|  1| andy|
|  2| jeff|
|  3|james|
+---+-----+
df1.drop("id", "name").show()
+---+-------+
|age|country|
+---+-------+
| 20|    USA|
| 23|  China|
| 18|    USA|
+---+-------+

2.4 관측점 행 추출

dplyr 팩키지 filter와 마찬가지로 원하는 관측점 행을 추출하고자 할 때는 동일한 명칭의 filter 메쏘드를 사용한다.

df1.filter(df1["age"] >= 20).show()
+---+----+---+-------+
| id|name|age|country|
+---+----+---+-------+
|  1|andy| 20|    USA|
|  2|jeff| 23|  China|
+---+----+---+-------+

2.5 그룹별 요약

그룹별 요약을 하는데 groupByagg와 함께 사용한다. 이는 dplyr 팩키지 group_by + summarize와 동일한 개념이다.

df1.groupBy("country").agg({"age": "avg", "id": "count"}).show()
+-------+---------+--------+
|country|count(id)|avg(age)|
+-------+---------+--------+
|  China|        1|    23.0|
|    USA|        2|    19.0|
+-------+---------+--------+

2.6 사용자 정의함수(UDF)

사용자 정의함수(User Defined Function)을 작성하여 표준 SQL 구문에서 제공되지 않는 연산작업을 수행시킬 수 있다.

from pyspark.sql.functions import udf
upper_character = udf(lambda x: x.upper())

df1.select(upper_character(df1["name"])).show()
+--------------+
|<lambda>(name)|
+--------------+
|          ANDY|
|          JEFF|
|         JAMES|
+--------------+

2.7 데이터프레임 죠인

두개의 서로 다른 스파크 데이터프레임을 죠인(join)하는 것도 가능하다.

df1.show()
+---+-----+---+-------+
| id| name|age|country|
+---+-----+---+-------+
|  1| andy| 20|    USA|
|  2| jeff| 23|  China|
|  3|james| 18|    USA|
+---+-----+---+-------+
df2 = spark.createDataFrame([(1, "USA"), (2, "China")]).toDF("c_id", "c_name")
df2.show()
+----+------+
|c_id|c_name|
+----+------+
|   1|   USA|
|   2| China|
+----+------+
df1.join(df2, df1["id"] == df2["c_id"]).show()
+---+----+---+-------+----+------+
| id|name|age|country|c_id|c_name|
+---+----+---+-------+----+------+
|  1|andy| 20|    USA|   1|   USA|
|  2|jeff| 23|  China|   2| China|
+---+----+---+-------+----+------+

3 윈도우 함수

databricks, “Introducing Window Functions in Spark SQL Notebook”에서 데이터를 준비한다.

data = \
  [("Thin", "Cell Phone", 6000),
  ("Normal", "Tablet", 1500),
  ("Mini", "Tablet", 5500),
  ("Ultra thin", "Cell Phone", 5500),
  ("Very thin", "Cell Phone", 6000),
  ("Big", "Tablet", 2500),
  ("Bendable", "Cell Phone", 3000),
  ("Foldable", "Cell Phone", 3000),
  ("Pro", "Tablet", 4500),
  ("Pro2", "Tablet", 6500)]
  
df = spark.createDataFrame(data, ["product", "category", "revenue"])

df.createOrReplaceTempView("product")

start_df = spark.sql("SELECT category, product, revenue \
                      FROM product \
                      ORDER BY category, revenue DESC")
start_df.show()                      
+----------+----------+-------+
|  category|   product|revenue|
+----------+----------+-------+
|Cell Phone| Very thin|   6000|
|Cell Phone|      Thin|   6000|
|Cell Phone|Ultra thin|   5500|
|Cell Phone|  Foldable|   3000|
|Cell Phone|  Bendable|   3000|
|    Tablet|      Pro2|   6500|
|    Tablet|      Mini|   5500|
|    Tablet|       Pro|   4500|
|    Tablet|       Big|   2500|
|    Tablet|    Normal|   1500|
+----------+----------+-------+

제품군별로 가장 매출 차이를 찾아보고자 하는 사례를 만들어보자. LAG, LEAD를 OVER와 함께 사용하여 윈도우 함수를 적용하여 관측점을 이동시킬 수 있다. 하지만 제품군내에서 작업된 것은 아니라 시각적으로 불편한다.

start_df.createOrReplaceTempView("start_tbl")

reveune_query = """
    SELECT category, product, 
    LAG(revenue, 1) OVER (ORDER BY revenue) AS revenue_lag,
    revenue,
    LEAD(revenue, 1) OVER (ORDER BY revenue) AS revenue_lead
    FROM start_tbl
    """

spark.sql(reveune_query).show()
+----------+----------+-----------+-------+------------+
|  category|   product|revenue_lag|revenue|revenue_lead|
+----------+----------+-----------+-------+------------+
|    Tablet|    Normal|       null|   1500|        2500|
|    Tablet|       Big|       1500|   2500|        3000|
|Cell Phone|  Bendable|       2500|   3000|        3000|
|Cell Phone|  Foldable|       3000|   3000|        4500|
|    Tablet|       Pro|       3000|   4500|        5500|
|Cell Phone|Ultra thin|       4500|   5500|        5500|
|    Tablet|      Mini|       5500|   5500|        6000|
|Cell Phone|      Thin|       5500|   6000|        6000|
|Cell Phone| Very thin|       6000|   6000|        6500|
|    Tablet|      Pro2|       6000|   6500|        null|
+----------+----------+-----------+-------+------------+

PARTITION BY를 그룹 집단을 도입하게 되면 원하는 결과를 얻을 수 있게 된다.

reveune_query = """
    SELECT category, product, 
    LAG(revenue, 1) OVER (PARTITION BY category ORDER BY revenue) AS revenue_lag,
    revenue,
    LEAD(revenue, 1) OVER (PARTITION BY category ORDER BY revenue) AS revenue_lead
    FROM start_tbl
    """

spark.sql(reveune_query).show()
+----------+----------+-----------+-------+------------+
|  category|   product|revenue_lag|revenue|revenue_lead|
+----------+----------+-----------+-------+------------+
|    Tablet|    Normal|       null|   1500|        2500|
|    Tablet|       Big|       1500|   2500|        4500|
|    Tablet|       Pro|       2500|   4500|        5500|
|    Tablet|      Mini|       4500|   5500|        6500|
|    Tablet|      Pro2|       5500|   6500|        null|
|Cell Phone|  Bendable|       null|   3000|        3000|
|Cell Phone|  Foldable|       3000|   3000|        5500|
|Cell Phone|Ultra thin|       3000|   5500|        6000|
|Cell Phone|      Thin|       5500|   6000|        6000|
|Cell Phone| Very thin|       6000|   6000|        null|
+----------+----------+-----------+-------+------------+

ROW_NUMBER()를 도입하게 되면 각 그룹별 번호를 매길 수 있게 된다.

reveune_query = """
    SELECT category, product, 
    ROW_NUMBER() OVER(PARTITION BY category ORDER BY revenue) AS id,
    LAG(revenue, 1) OVER (PARTITION BY category ORDER BY revenue) AS revenue_lag,
    revenue,
    LEAD(revenue, 1) OVER (PARTITION BY category ORDER BY revenue) AS revenue_lead
    FROM start_tbl
    """

spark.sql(reveune_query).show()
+----------+----------+---+-----------+-------+------------+
|  category|   product| id|revenue_lag|revenue|revenue_lead|
+----------+----------+---+-----------+-------+------------+
|    Tablet|    Normal|  1|       null|   1500|        2500|
|    Tablet|       Big|  2|       1500|   2500|        4500|
|    Tablet|       Pro|  3|       2500|   4500|        5500|
|    Tablet|      Mini|  4|       4500|   5500|        6500|
|    Tablet|      Pro2|  5|       5500|   6500|        null|
|Cell Phone|  Bendable|  1|       null|   3000|        3000|
|Cell Phone|  Foldable|  2|       3000|   3000|        5500|
|Cell Phone|Ultra thin|  3|       3000|   5500|        6000|
|Cell Phone|      Thin|  4|       5500|   6000|        6000|
|Cell Phone| Very thin|  5|       6000|   6000|        null|
+----------+----------+---+-----------+-------+------------+