< dataframe schema >

 

df 의 스키마 : 컬럼명과 데이터 타입

df 의 스키마 확인 방법 :

 - df.printSchema()

 - df.schema

 

스키마 직접 설정하여 df 만드는 예제

 

from pyspark.sql.types import StructType,StructField
from pyspark.sql.types import StringType, IntegerType, ArrayType
data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M",1),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F",2),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F",3),
    (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M",4),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M",5),
    (("Mike","Mary","Williams"),["Python","VB"],"OH","M",6)
 ]
       
schema = StructType([
     StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True),
     StructField('num', IntegerType(), True)
 ])
 
df = spark.createDataFrame(data = data, schema = schema)
df.printSchema()
df.show(truncate=False)

 

 

 

 


 

 

df 의 컬럼 확인

>>> df.columns
['address', 'age', 'height', 'name', 'rollno', 'weight']

 

 

df 의 컬럼 선택 (select)

>>> from pyspark.sql.functions import expr

>>> df.select(expr("height"))
>>> df.select(expr("height as hhh"))
>>> df.select(expr("height").alias("hhh"))
>>> df.select(expr("height + 100"))  # 연산도 가능

>>> df.select(expr("height + weight"))  # 같은 digit 형이면, 연산 가능

>>> df.select(expr("`long col-umn name`"))   # 컬럼 이름에 공백이나 dash(-) 가 포함되면, 백틱(`)으로 감싸서 선택해야 함 

>>> df.selectExpr("height")  # select 와 expr 을 한 번에

>>> df.selectExpr("*", "height < weight", "name as nnn")

>>> df.selectExpr("avg(height)", "count(distinct(name))")  # 집계도 가능

>>> df.select("name")

>>> from pyspark.sql.functions import col,column

>>> df.select(col("name"))

>>> df.select(column("name"))

>>> df.select(col("name"), expr("height")) # 이렇게 두 가지 섞어쓰면 안 됨

>>> from pyspark.sql.functions import lit
>>> df.select(lit(1).alias("One"))  # 명시적인 값을 넣어야 할 때 lit 을 사용

 

 

 

df 에 값 추가

>>> df.withColumn("myColumn", lit(1))

>>> df.withColumn("myColumn", expr("age>weight-20"))

 

 

df 컬럼 타입 확인

>>> df.schema

>>> df.schema["height"]

>>> df.schema["height"].dataType

 

 

df 컬럼 형 변환

>>> df.withColumn("str height", col("height").cast("string"))

 

 

df 컬럼 정렬

>>> df.orderBy("name")

>>> df.sort("weight")  # orderBy 와 완벽하게 동일
>>> df.orderBy(expr("name"))
>>> df.orderBy("name", "age")

>>> df.orderBy(col("name"), col("age"))

>>> from pyspark.sql.functions import desc, asc

>>> df.orderBy(col("name").desc(), col("age").asc())

>>> df.orderBy(col("name").desc_nulls_first(), col("age").asc_nulls_last())

 

 

 

 


 

 

 

df 값을 driver 로 옮기는 방법

 

df.take(1)

df.first()

df.show()

df.collect()

df.toLocalIterator()

 

맨 아래 두개는 되도록 쓰지 말아야 함..

 

 


 

 

 

repartition vs coalesce

 

repartition 메소드 호출시 무조건 전체 셔플이 발생. 즉, 데이터들을 파티션 개수에 맞게, 균등하게 재배치하는 것임

기존 파티션 수보다 더 늘려야 할 때, 혹은 컬럼을 기준으로 파티션을 생성해야 할 때 사용

자주 필터링되는 컬럼이 있다면, 그 컬럼으로 파티션 재분배하여 성능 향상을 노릴 수 있음

 

coalesce 메소드는 전체 데이터를 셔플하지 않고 파티션을 병합하는 경우 사용

말 그대로 그냥 파티션을 합치는 용도인가 봄

셔플 없이 합치면 skew 가 발생하지 않을까? 맞음, 균등한 분포가 보장되지 않음.

df 를 parquet 등으로 저장시, 하나의 파일로 만들고싶을 때 coalesce(1) 을 사용

 

 

참고로 coalesce 는 '합치다'라는 뜻임

 

 


 

 

 

 

특이하게 "각 파티션 내 데이터" 정렬이 가능함

예를 들어 

spark.read.format("json").load("/my/json/path/file.json").sortWithinPartitions("count")

 

sortWithinPartitions는 각 파티션 내부에서만 데이터를 정렬하는 함수

즉, 전체 DataFrame을 정렬하는 것이 아니라, 개별 파티션 내에서만 정렬이 수행

 

아래와 같은 상황에서 사용함

 

  • 파티션 간 순서는 신경 쓰지 않지만, 각 파티션 내부에서만 정렬이 필요할 때
    • 예: Spark Streaming에서 파티션 단위로 정렬하여 처리해야 하는 경우
  • 완전한 정렬보다 빠른 성능이 필요할 때
    • 예: 일부 작업(예: mapPartitions)이 특정 순서로 정렬된 데이터에서만 유효한 경우

 

 

 

 

 


 

 

 

문자열 함수

>>> from pyspark.sql.functions import lower,upper,initcap,lit,ltrim,rtrim,rpad,lpad,trim

>>> df.select(
... initcap
(col("name")),
... 
lower(col("name")),
... upper(col("name")),
... ltrim(lit(" this is sample ")),
... rtrim(lit(" this is sample ")),
... trim(lit(" this is sample ")),
... lpad(lit("this is sample"), 3, " "),
... rpad(lit("this is sample"), 10, " "))

 

 

 


 

 

filter

https://eyeballs.tistory.com/442

 

 


 

 

 

날짜

>>> from pyspark.sql.functions import current_date, current_timestamp

>>> df.select(current_date(), current_timestamp())

+--------------+-----------------------+
|current_date()|current_timestamp()    |
+--------------+-----------------------+
|2025-02-04    |2025-02-04 22:26:37.882|
+--------------+-----------------------+

 

>>> from pyspark.sql.functions import date_add, date_sub

>>> df.select(date_add(col("today"), 3), date_sub(col("today"), 3)).limit(1).show()
혹은

>>> df.select(date_add(current_date(), 3), date_sub(current_timestamp(), 3)).limit(1).show()
+---------------------------+--------------------------------+
|date_add(current_date(), 3)|date_sub(current_timestamp(), 3)|
+---------------------------+--------------------------------+
|                 2025-02-07|                      2025-02-01|
+---------------------------+--------------------------------+

 

두 날짜 차이 구하기 : datediff

두 날짜 사이 개월 수 구하기 : months_between

 

string 을 날짜 컬럼으로 만들기 : to_date

>>> df.select(to_date(lit("2025-02-04")))

>>> df.select(to_date(lit("2025.02.04"), "yyyy.MM.dd"))

 

date 를 timestamp 로 만들기 : to_timestamp

>>> df.select(to_timestamp(to_date(lit("2025-02-04"))))  # 시 분 초는 00:00:00 이 됨

 

 


 

 

 

 

< spark Session 설정 및 conf 설정 >

 

아래 설정으로 app 이름과 hive 연동 설정 및 queue 설정을 할 수 있음.

 

import os

import findspark

from pyspark.sql import SparkSession

 

SPARK_HOME = os.getenv("SPARK_HOME")

findspark.init()

findspark.find()

spark = SparkSession.builder \
    .appName('eyeballs_test') \
    .master("yarn") \
    .config('spark.yarn.queue', 'low_queue') \
    .config("hive.metastore.uris", "thrift://hive.metastore.uris:1234") \
    .config("spark.yarn.dist.files", f"file:{SPARK_HOME}/lib/pyspark.zip,file:{SPARK_HOME}/lib/py4j-0.10.9-src.zip") \
    .enableHiveSupport() \
    .getOrCreate()

 

 

 


 

 

 

 

< SQL 의 like 처럼 filtering 하기 >

(column 값을 기준으로) SQL 의 like 조건문을 이용하여 row 를 filter 하는 방법은 다음과 같다.

 

df.filter(df.name.like('Al%')).collect()
[Row(age=2, name='Alice')]

 

참고 : https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.like.html

 

 


 

 

 

 

< regex 로 filtering 하기 >

 

(column 값을 기준으로) regex 를 이용하여 row 를 filter 하는 방법은 다음과 같다.

 

df.filter(" name rlike 'eyeballs' ")

 = df.filter(col("name").rlike("eyeballs"))

 

df.filter(" id rlike '^(EYE.*|eye.*)$' ")

 = df.filter(col("id").rlike("^(EYE.*|eye.*)$"))

 

 

 


 

 

 

 

< regex 로 filtering 한 값의 여집합 구하기 >

 

(column 값을 기준으로) regex 를 이용하여 row 를 filter 한 값의 여집합을 구하는 방법은 다음과 같다.

 

df.filter(" name not rlike 'eyeballs' ")

 

df.filter(" id not rlike '^(EYE.*|eye.*)$' ")

 

참고 : stackoverflow.com/a/58555818

 

 

 


 

 

 

 

 

< java jar 파일 import 하는 방법 >

 

pyspark --jars aaa.jar,bbb.jar,ccc.jar

 

참고 : stackoverflow.com/a/28489027

java class 를 jar 로 만드는 절차 : korbillgates.tistory.com/168

stackoverflow.com/a/61972525

 

 

 

 


 

 

 

 

 

< url 값을 갖는 column 에서 url parsing 하기 >

 

url 값이 다음과 같다고 하자.

 

url.show(1,False)

 

+---|url|----+
|https://eyeballs.tistory.com/at.gif?data=ABC|

 

다음과 같이 parse_url sql쿼리를 사용하여 url 로부터 QUERY 를 추출할 수 있다.


url = url.selectExpr("source", "parse_url(source, 'QUERY', 'data') as data")
url.show(1,False)

 

+---|url|data|----+
|https://eyeballs.tistory.com/at.gif?data=ABC|ABC|



참고 :

https://stackoverflow.com/a/59634532

https://docs.microsoft.com/ko-kr/azure/databricks/sql/language-manual/functions/parse_url

 

 

 

 


 

 

 

 

 

< base64 로 인코딩된 값을 갖는 column 에서 base64 디코딩 하기 >

 

base64로 인코딩 된 값이 다음과 같다고 하자.

 

encoded_base64.show(1)

 

+--------------------+
|                 log|
+--------------------+
|ZXllYmFsbHMudGlzdG9yeS5jb20=|
+--------------------+

 

다음과 같이 unbase64 함수를 사용하여 base64 디코딩을 할 수 있다.

 

from pyspark.sql.functions import unbase64

decoded_base64 = encoded_base64.withColumn('log', unbase64(encoded_base64.log).cast("string"))
decoded_base64.show(1)

 

+--------------------+
|                 log|
+--------------------+
|eyeballs.tistory.com|
+--------------------+

 

참고 : https://stackoverflow.com/a/69707602

 

 

 

 


 

 

 

 

 

< explode >

array 로 된 column 값의 array 원소들을 모조리 하나의 row 로 나누는 것

참고 : https://brunch.co.kr/@yysttong/70

 

 

 

 


 

 

 

 

 

< datatype 확인하기 >

 

print(df.schema["log"].dataType)
결과 : ArrayType(StringType,true)

 

참고 : https://sparkbyexamples.com/pyspark/pyspark-find-datatype-column-names-of-dataframe/

 

 

 

 


 

 

 

 

 

< ArrayType 처리하기 >

 

참고 : https://mungingdata.com/pyspark/filter-array/

 

 

 

 


 

 

 

 

< replace >

 

df 에 있는 'address' column 의 값 중 AAA 를 BBB 로 바꾸는 방법

 

from pyspark.sql.functions import *
newDf = df.withColumn('address', regexp_replace('address', 'AAA', 'BBB'))

 

참고 :

https://sparkbyexamples.com/pyspark/pyspark-replace-column-values/

https://stackoverflow.com/a/37038231

 

 

 

 

 


 

 

 

 

< df 대상으로 sql 사용 >

 

createOrReplaceTempView 메소드를 통하여

임시 view(table) 생성한 후, 해당 임시 테이블에 쿼리를 날릴 수 있음

 

df.createOrReplaceTempView("temporary_table")
spark.sql("SELECT col1, col2 FROM temporary_table where col2>3").show()

 

참고 :

https://jhleeeme.github.io/spark-temp-view/

https://sparkbyexamples.com/pyspark/pyspark-sql-expr-expression-function/

 

 

 

 

 


 

 

 

 

< SQL 의 CASE WHEN 문법 사용 >

 

아래처럼 when 함수를 사용하면 됨

when 함수의 첫번째 파라미터는 조건문, 두번째 파라미터는 조건문이 참일 때 넣는 값

 

from pyspark.sql.functions import when
df2 = df.withColumn("new_gender", 

                                 when(df.gender == "M", "Male")
                                 .when(df.gender == "F", "Female")
                                 .when(df.gender.isNull(), "")
                                 .otherwise(df.gender))

 

참고 : https://sparkbyexamples.com/pyspark/pyspark-when-otherwise/

 

 

 

 


 

 

 

 

 

< column 내 null 값 필터링 >

 

df.where(col("dt_mvmt").isNull())
df.where(col("dt_mvmt").isNotNull())

 

참고 : https://stackoverflow.com/a/37262973

 

 

 

 

 

 


 

 

 

 

 

 

< dataframe 정렬 >

 

sort 와 orderBy 는 완벽하게 같은 방식으로 동작

 

df.sort("number")

df.orderBy("number")

df.sort("number", "name")

df.sort(col("number"), col("name"))

df.sort(col("number").asc(), col("name").desc())

 

 

 

 

 

 

 

 

+ Recent posts