< spark Session 설정 및 conf 설정 >

 

아래 설정으로 app 이름과 hive 연동 설정 및 queue 설정을 할 수 있음.

 

import os

import findspark

from pyspark.sql import SparkSession

 

SPARK_HOME = os.getenv("SPARK_HOME")

findspark.init()

findspark.find()

spark = SparkSession.builder \
    .appName('eyeballs_test') \
    .master("yarn") \
    .config('spark.yarn.queue', 'low_queue') \
    .config("hive.metastore.uris", "thrift://hive.metastore.uris:1234") \
    .config("spark.yarn.dist.files", f"file:{SPARK_HOME}/lib/pyspark.zip,file:{SPARK_HOME}/lib/py4j-0.10.9-src.zip") \
    .enableHiveSupport() \
    .getOrCreate()

 

 

 

 

 

 

 

 

< SQL 의 like 처럼 filtering 하기 >

(column 값을 기준으로) SQL 의 like 조건문을 이용하여 row 를 filter 하는 방법은 다음과 같다.

 

df.filter(df.name.like('Al%')).collect()
[Row(age=2, name='Alice')]

 

참고 : https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.like.html

 

 

 

< regex 로 filtering 하기 >

 

(column 값을 기준으로) regex 를 이용하여 row 를 filter 하는 방법은 다음과 같다.

 

df.filter(" name rlike 'eyeballs' ")

 = df.filter(col("name").rlike("eyeballs"))

 

df.filter(" id rlike '^(EYE.*|eye.*)$' ")

 = df.filter(col("id").rlike("^(EYE.*|eye.*)$"))

 

 

 

 

< regex 로 filtering 한 값의 여집합 구하기 >

 

(column 값을 기준으로) regex 를 이용하여 row 를 filter 한 값의 여집합을 구하는 방법은 다음과 같다.

 

df.filter(" name not rlike 'eyeballs' ")

 

df.filter(" id not rlike '^(EYE.*|eye.*)$' ")

 

참고 : stackoverflow.com/a/58555818

 

 

 

 

 

< java jar 파일 import 하는 방법 >

 

pyspark --jars aaa.jar,bbb.jar,ccc.jar

 

참고 : stackoverflow.com/a/28489027

java class 를 jar 로 만드는 절차 : korbillgates.tistory.com/168

stackoverflow.com/a/61972525

 

 

 

 

< url 값을 갖는 column 에서 url parsing 하기 >

 

url 값이 다음과 같다고 하자.

 

url.show(1,False)

 

+---|url|----+
|https://eyeballs.tistory.com/at.gif?data=ABC|

 

다음과 같이 parse_url sql쿼리를 사용하여 url 로부터 QUERY 를 추출할 수 있다.


url = url.selectExpr("source", "parse_url(source, 'QUERY', 'data') as data")
url.show(1,False)

 

+---|url|data|----+
|https://eyeballs.tistory.com/at.gif?data=ABC|ABC|



참고 :

https://stackoverflow.com/a/59634532

https://docs.microsoft.com/ko-kr/azure/databricks/sql/language-manual/functions/parse_url

 

 

 

 

 

< base64 로 인코딩된 값을 갖는 column 에서 base64 디코딩 하기 >

 

base64로 인코딩 된 값이 다음과 같다고 하자.

 

encoded_base64.show(1)

 

+--------------------+
|                 log|
+--------------------+
|ZXllYmFsbHMudGlzdG9yeS5jb20=|
+--------------------+

 

다음과 같이 unbase64 함수를 사용하여 base64 디코딩을 할 수 있다.

 

from pyspark.sql.functions import unbase64

decoded_base64 = encoded_base64.withColumn('log', unbase64(encoded_base64.log).cast("string"))
decoded_base64.show(1)

 

+--------------------+
|                 log|
+--------------------+
|eyeballs.tistory.com|
+--------------------+

 

참고 : https://stackoverflow.com/a/69707602

 

 

 

 

< explode >

array 로 된 column 값의 array 원소들을 모조리 하나의 row 로 나누는 것

참고 : https://brunch.co.kr/@yysttong/70

 

 

 

< datatype 확인하기 >

 

print(df.schema["log"].dataType)
결과 : ArrayType(StringType,true)

 

참고 : https://sparkbyexamples.com/pyspark/pyspark-find-datatype-column-names-of-dataframe/

 

 

 

 

< ArrayType 처리하기 >

 

참고 : https://mungingdata.com/pyspark/filter-array/

 

 

 

< replace >

 

df 에 있는 'address' column 의 값 중 AAA 를 BBB 로 바꾸는 방법

 

from pyspark.sql.functions import *
newDf = df.withColumn('address', regexp_replace('address', 'AAA', 'BBB'))

 

참고 :

https://sparkbyexamples.com/pyspark/pyspark-replace-column-values/

https://stackoverflow.com/a/37038231

 

 

 

 

< df 대상으로 sql 사용 >

 

createOrReplaceTempView 메소드를 통하여

임시 view(table) 생성한 후, 해당 임시 테이블에 쿼리를 날릴 수 있음

 

df.createOrReplaceTempView("temporary_table")
spark.sql("SELECT col1, col2 FROM temporary_table where col2>3").show()

 

참고 :

https://jhleeeme.github.io/spark-temp-view/

https://sparkbyexamples.com/pyspark/pyspark-sql-expr-expression-function/

 

 

< SQL 의 CASE WHEN 문법 사용 >

 

아래처럼 when 함수를 사용하면 됨

when 함수의 첫번째 파라미터는 조건문, 두번째 파라미터는 조건문이 참일 때 넣는 값

 

from pyspark.sql.functions import when
df2 = df.withColumn("new_gender", 

                                 when(df.gender == "M", "Male")
                                 .when(df.gender == "F", "Female")
                                 .when(df.gender.isNull(), "")
                                 .otherwise(df.gender))

 

참고 : https://sparkbyexamples.com/pyspark/pyspark-when-otherwise/

 

 

 

 

< column 내 null 값 필터링 >

 

df.where(col("dt_mvmt").isNull())
df.where(col("dt_mvmt").isNotNull())

 

참고 : https://stackoverflow.com/a/37262973

 

 

 

< dataframe 정렬 >

 

sort 와 orderBy 는 완벽하게 같은 방식으로 동작

 

df.sort("number")

df.orderBy("number")

df.sort("number", "name")

df.sort(col("number"), col("name"))

df.sort(col("number").asc(), col("name").desc())

 

 

 

 

 

 

 

 

+ Recent posts