< dataframe schema >
df 의 스키마 : 컬럼명과 데이터 타입
df 의 스키마 확인 방법 :
- df.printSchema()
- df.schema
스키마 직접 설정하여 df 만드는 예제
from pyspark.sql.types import StructType,StructField from pyspark.sql.types import StringType, IntegerType, ArrayType data = [ (("James","","Smith"),["Java","Scala","C++"],"OH","M",1), (("Anna","Rose",""),["Spark","Java","C++"],"NY","F",2), (("Julia","","Williams"),["CSharp","VB"],"OH","F",3), (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M",4), (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M",5), (("Mike","Mary","Williams"),["Python","VB"],"OH","M",6) ] schema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('languages', ArrayType(StringType()), True), StructField('state', StringType(), True), StructField('gender', StringType(), True), StructField('num', IntegerType(), True) ]) df = spark.createDataFrame(data = data, schema = schema) df.printSchema() df.show(truncate=False) |
df 의 컬럼 확인
>>> df.columns
['address', 'age', 'height', 'name', 'rollno', 'weight']
df 의 컬럼 선택 (select)
>>> from pyspark.sql.functions import expr
>>> df.select(expr("height"))
>>> df.select(expr("height as hhh"))
>>> df.select(expr("height").alias("hhh"))
>>> df.select(expr("height + 100")) # 연산도 가능
>>> df.select(expr("height + weight")) # 같은 digit 형이면, 연산 가능
>>> df.select(expr("`long col-umn name`")) # 컬럼 이름에 공백이나 dash(-) 가 포함되면, 백틱(`)으로 감싸서 선택해야 함
>>> df.selectExpr("height") # select 와 expr 을 한 번에
>>> df.selectExpr("*", "height < weight", "name as nnn")
>>> df.selectExpr("avg(height)", "count(distinct(name))") # 집계도 가능
>>> df.select("name")
>>> from pyspark.sql.functions import col,column
>>> df.select(col("name"))
>>> df.select(column("name"))
>>> df.select(col("name"), expr("height")) # 이렇게 두 가지 섞어쓰면 안 됨
>>> from pyspark.sql.functions import lit
>>> df.select(lit(1).alias("One")) # 명시적인 값을 넣어야 할 때 lit 을 사용
df 에 값 추가
>>> df.withColumn("myColumn", lit(1))
>>> df.withColumn("myColumn", expr("age>weight-20"))
df 컬럼 타입 확인
>>> df.schema
>>> df.schema["height"]
>>> df.schema["height"].dataType
df 컬럼 형 변환
>>> df.withColumn("str height", col("height").cast("string"))
df 컬럼 정렬
>>> df.orderBy("name")
>>> df.sort("weight") # orderBy 와 완벽하게 동일
>>> df.orderBy(expr("name"))
>>> df.orderBy("name", "age")
>>> df.orderBy(col("name"), col("age"))
>>> from pyspark.sql.functions import desc, asc
>>> df.orderBy(col("name").desc(), col("age").asc())
>>> df.orderBy(col("name").desc_nulls_first(), col("age").asc_nulls_last())
df 값을 driver 로 옮기는 방법
df.take(1)
df.first()
df.show()
df.collect()
df.toLocalIterator()
맨 아래 두개는 되도록 쓰지 말아야 함..
repartition vs coalesce
repartition 메소드 호출시 무조건 전체 셔플이 발생. 즉, 데이터들을 파티션 개수에 맞게, 균등하게 재배치하는 것임
기존 파티션 수보다 더 늘려야 할 때, 혹은 컬럼을 기준으로 파티션을 생성해야 할 때 사용
자주 필터링되는 컬럼이 있다면, 그 컬럼으로 파티션 재분배하여 성능 향상을 노릴 수 있음
coalesce 메소드는 전체 데이터를 셔플하지 않고 파티션을 병합하는 경우 사용
말 그대로 그냥 파티션을 합치는 용도인가 봄
셔플 없이 합치면 skew 가 발생하지 않을까? 맞음, 균등한 분포가 보장되지 않음.
df 를 parquet 등으로 저장시, 하나의 파일로 만들고싶을 때 coalesce(1) 을 사용
참고로 coalesce 는 '합치다'라는 뜻임
특이하게 "각 파티션 내 데이터" 정렬이 가능함
예를 들어
spark.read.format("json").load("/my/json/path/file.json").sortWithinPartitions("count")
sortWithinPartitions는 각 파티션 내부에서만 데이터를 정렬하는 함수
즉, 전체 DataFrame을 정렬하는 것이 아니라, 개별 파티션 내에서만 정렬이 수행
아래와 같은 상황에서 사용함
- 파티션 간 순서는 신경 쓰지 않지만, 각 파티션 내부에서만 정렬이 필요할 때
- 예: Spark Streaming에서 파티션 단위로 정렬하여 처리해야 하는 경우
- 완전한 정렬보다 빠른 성능이 필요할 때
- 예: 일부 작업(예: mapPartitions)이 특정 순서로 정렬된 데이터에서만 유효한 경우
문자열 함수
>>> from pyspark.sql.functions import lower,upper,initcap,lit,ltrim,rtrim,rpad,lpad,trim
>>> df.select(
... initcap(col("name")),
... lower(col("name")),
... upper(col("name")),
... ltrim(lit(" this is sample ")),
... rtrim(lit(" this is sample ")),
... trim(lit(" this is sample ")),
... lpad(lit("this is sample"), 3, " "),
... rpad(lit("this is sample"), 10, " "))
filter
https://eyeballs.tistory.com/442
날짜
>>> from pyspark.sql.functions import current_date, current_timestamp
>>> df.select(current_date(), current_timestamp())
+--------------+-----------------------+
|current_date()|current_timestamp() |
+--------------+-----------------------+
|2025-02-04 |2025-02-04 22:26:37.882|
+--------------+-----------------------+
>>> from pyspark.sql.functions import date_add, date_sub
>>> df.select(date_add(col("today"), 3), date_sub(col("today"), 3)).limit(1).show()
혹은
>>> df.select(date_add(current_date(), 3), date_sub(current_timestamp(), 3)).limit(1).show()
+---------------------------+--------------------------------+
|date_add(current_date(), 3)|date_sub(current_timestamp(), 3)|
+---------------------------+--------------------------------+
| 2025-02-07| 2025-02-01|
+---------------------------+--------------------------------+
두 날짜 차이 구하기 : datediff
두 날짜 사이 개월 수 구하기 : months_between
string 을 날짜 컬럼으로 만들기 : to_date
>>> df.select(to_date(lit("2025-02-04")))
>>> df.select(to_date(lit("2025.02.04"), "yyyy.MM.dd"))
date 를 timestamp 로 만들기 : to_timestamp
>>> df.select(to_timestamp(to_date(lit("2025-02-04")))) # 시 분 초는 00:00:00 이 됨
< spark Session 설정 및 conf 설정 >
아래 설정으로 app 이름과 hive 연동 설정 및 queue 설정을 할 수 있음.
import os
import findspark
from pyspark.sql import SparkSession
SPARK_HOME = os.getenv("SPARK_HOME")
findspark.init()
findspark.find()
spark = SparkSession.builder \
.appName('eyeballs_test') \
.master("yarn") \
.config('spark.yarn.queue', 'low_queue') \
.config("hive.metastore.uris", "thrift://hive.metastore.uris:1234") \
.config("spark.yarn.dist.files", f"file:{SPARK_HOME}/lib/pyspark.zip,file:{SPARK_HOME}/lib/py4j-0.10.9-src.zip") \
.enableHiveSupport() \
.getOrCreate()
< SQL 의 like 처럼 filtering 하기 >
(column 값을 기준으로) SQL 의 like 조건문을 이용하여 row 를 filter 하는 방법은 다음과 같다.
df.filter(df.name.like('Al%')).collect()
[Row(age=2, name='Alice')]
참고 : https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.like.html
< regex 로 filtering 하기 >
(column 값을 기준으로) regex 를 이용하여 row 를 filter 하는 방법은 다음과 같다.
df.filter(" name rlike 'eyeballs' ")
= df.filter(col("name").rlike("eyeballs"))
df.filter(" id rlike '^(EYE.*|eye.*)$' ")
= df.filter(col("id").rlike("^(EYE.*|eye.*)$"))
< regex 로 filtering 한 값의 여집합 구하기 >
(column 값을 기준으로) regex 를 이용하여 row 를 filter 한 값의 여집합을 구하는 방법은 다음과 같다.
df.filter(" name not rlike 'eyeballs' ")
df.filter(" id not rlike '^(EYE.*|eye.*)$' ")
참고 : stackoverflow.com/a/58555818
< java jar 파일 import 하는 방법 >
pyspark --jars aaa.jar,bbb.jar,ccc.jar
참고 : stackoverflow.com/a/28489027
java class 를 jar 로 만드는 절차 : korbillgates.tistory.com/168
< url 값을 갖는 column 에서 url parsing 하기 >
url 값이 다음과 같다고 하자.
url.show(1,False)
+---|url|----+ |https://eyeballs.tistory.com/at.gif?data=ABC| |
다음과 같이 parse_url sql쿼리를 사용하여 url 로부터 QUERY 를 추출할 수 있다.
url = url.selectExpr("source", "parse_url(source, 'QUERY', 'data') as data")
url.show(1,False)
+---|url|data|----+ |https://eyeballs.tistory.com/at.gif?data=ABC|ABC| |
참고 :
https://stackoverflow.com/a/59634532
https://docs.microsoft.com/ko-kr/azure/databricks/sql/language-manual/functions/parse_url
< base64 로 인코딩된 값을 갖는 column 에서 base64 디코딩 하기 >
base64로 인코딩 된 값이 다음과 같다고 하자.
encoded_base64.show(1)
+--------------------+ | log| +--------------------+ |ZXllYmFsbHMudGlzdG9yeS5jb20=| +--------------------+ |
다음과 같이 unbase64 함수를 사용하여 base64 디코딩을 할 수 있다.
from pyspark.sql.functions import unbase64
decoded_base64 = encoded_base64.withColumn('log', unbase64(encoded_base64.log).cast("string"))
decoded_base64.show(1)
+--------------------+ | log| +--------------------+ |eyeballs.tistory.com| +--------------------+ |
참고 : https://stackoverflow.com/a/69707602
< explode >
array 로 된 column 값의 array 원소들을 모조리 하나의 row 로 나누는 것
참고 : https://brunch.co.kr/@yysttong/70
< datatype 확인하기 >
print(df.schema["log"].dataType)
결과 : ArrayType(StringType,true)
참고 : https://sparkbyexamples.com/pyspark/pyspark-find-datatype-column-names-of-dataframe/
< ArrayType 처리하기 >
참고 : https://mungingdata.com/pyspark/filter-array/
< replace >
df 에 있는 'address' column 의 값 중 AAA 를 BBB 로 바꾸는 방법
from pyspark.sql.functions import *
newDf = df.withColumn('address', regexp_replace('address', 'AAA', 'BBB'))
참고 :
https://sparkbyexamples.com/pyspark/pyspark-replace-column-values/
https://stackoverflow.com/a/37038231
< df 대상으로 sql 사용 >
createOrReplaceTempView 메소드를 통하여
임시 view(table) 생성한 후, 해당 임시 테이블에 쿼리를 날릴 수 있음
df.createOrReplaceTempView("temporary_table")
spark.sql("SELECT col1, col2 FROM temporary_table where col2>3").show()
참고 :
https://jhleeeme.github.io/spark-temp-view/
https://sparkbyexamples.com/pyspark/pyspark-sql-expr-expression-function/
< SQL 의 CASE WHEN 문법 사용 >
아래처럼 when 함수를 사용하면 됨
when 함수의 첫번째 파라미터는 조건문, 두번째 파라미터는 조건문이 참일 때 넣는 값
from pyspark.sql.functions import when
df2 = df.withColumn("new_gender",
when(df.gender == "M", "Male")
.when(df.gender == "F", "Female")
.when(df.gender.isNull(), "")
.otherwise(df.gender))
참고 : https://sparkbyexamples.com/pyspark/pyspark-when-otherwise/
< column 내 null 값 필터링 >
df.where(col("dt_mvmt").isNull())
df.where(col("dt_mvmt").isNotNull())
참고 : https://stackoverflow.com/a/37262973
< dataframe 정렬 >
sort 와 orderBy 는 완벽하게 같은 방식으로 동작
df.sort("number")
df.orderBy("number")
df.sort("number", "name")
df.sort(col("number"), col("name"))
df.sort(col("number").asc(), col("name").desc())
'Spark' 카테고리의 다른 글
[Spark] SQL Built-in Functions 문서 링크 (0) | 2021.06.16 |
---|---|
[Spark] json string 값을 갖는 column 에서 json 값 추출하는 방법 + 삽질의 결과 (0) | 2021.04.29 |
[Spark] Standalone + Hadoop 설치 방법 (0) | 2021.03.26 |
[Spark] 디버깅, 로그 보는 방법 링크(영어) (0) | 2021.03.24 |
[Spark] history ui port 바꾸는 방법 (0) | 2020.08.13 |