< spark Session 설정 및 conf 설정 >
아래 설정으로 app 이름과 hive 연동 설정 및 queue 설정을 할 수 있음.
import os
import findspark
from pyspark.sql import SparkSession
SPARK_HOME = os.getenv("SPARK_HOME")
findspark.init()
findspark.find()
spark = SparkSession.builder \
.appName('eyeballs_test') \
.master("yarn") \
.config('spark.yarn.queue', 'low_queue') \
.config("hive.metastore.uris", "thrift://hive.metastore.uris:1234") \
.config("spark.yarn.dist.files", f"file:{SPARK_HOME}/lib/pyspark.zip,file:{SPARK_HOME}/lib/py4j-0.10.9-src.zip") \
.enableHiveSupport() \
.getOrCreate()
< SQL 의 like 처럼 filtering 하기 >
(column 값을 기준으로) SQL 의 like 조건문을 이용하여 row 를 filter 하는 방법은 다음과 같다.
df.filter(df.name.like('Al%')).collect()
[Row(age=2, name='Alice')]
참고 : https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.like.html
< regex 로 filtering 하기 >
(column 값을 기준으로) regex 를 이용하여 row 를 filter 하는 방법은 다음과 같다.
df.filter(" name rlike 'eyeballs' ")
= df.filter(col("name").rlike("eyeballs"))
df.filter(" id rlike '^(EYE.*|eye.*)$' ")
= df.filter(col("id").rlike("^(EYE.*|eye.*)$"))
< regex 로 filtering 한 값의 여집합 구하기 >
(column 값을 기준으로) regex 를 이용하여 row 를 filter 한 값의 여집합을 구하는 방법은 다음과 같다.
df.filter(" name not rlike 'eyeballs' ")
df.filter(" id not rlike '^(EYE.*|eye.*)$' ")
참고 : stackoverflow.com/a/58555818
< java jar 파일 import 하는 방법 >
pyspark --jars aaa.jar,bbb.jar,ccc.jar
참고 : stackoverflow.com/a/28489027
java class 를 jar 로 만드는 절차 : korbillgates.tistory.com/168
< url 값을 갖는 column 에서 url parsing 하기 >
url 값이 다음과 같다고 하자.
url.show(1,False)
+---|url|----+ |https://eyeballs.tistory.com/at.gif?data=ABC| |
다음과 같이 parse_url sql쿼리를 사용하여 url 로부터 QUERY 를 추출할 수 있다.
url = url.selectExpr("source", "parse_url(source, 'QUERY', 'data') as data")
url.show(1,False)
+---|url|data|----+ |https://eyeballs.tistory.com/at.gif?data=ABC|ABC| |
참고 :
https://stackoverflow.com/a/59634532
https://docs.microsoft.com/ko-kr/azure/databricks/sql/language-manual/functions/parse_url
< base64 로 인코딩된 값을 갖는 column 에서 base64 디코딩 하기 >
base64로 인코딩 된 값이 다음과 같다고 하자.
encoded_base64.show(1)
+--------------------+ | log| +--------------------+ |ZXllYmFsbHMudGlzdG9yeS5jb20=| +--------------------+ |
다음과 같이 unbase64 함수를 사용하여 base64 디코딩을 할 수 있다.
from pyspark.sql.functions import unbase64
decoded_base64 = encoded_base64.withColumn('log', unbase64(encoded_base64.log).cast("string"))
decoded_base64.show(1)
+--------------------+ | log| +--------------------+ |eyeballs.tistory.com| +--------------------+ |
참고 : https://stackoverflow.com/a/69707602
< explode >
array 로 된 column 값의 array 원소들을 모조리 하나의 row 로 나누는 것
참고 : https://brunch.co.kr/@yysttong/70
< datatype 확인하기 >
print(df.schema["log"].dataType)
결과 : ArrayType(StringType,true)
참고 : https://sparkbyexamples.com/pyspark/pyspark-find-datatype-column-names-of-dataframe/
< ArrayType 처리하기 >
참고 : https://mungingdata.com/pyspark/filter-array/
< replace >
df 에 있는 'address' column 의 값 중 AAA 를 BBB 로 바꾸는 방법
from pyspark.sql.functions import *
newDf = df.withColumn('address', regexp_replace('address', 'AAA', 'BBB'))
참고 :
https://sparkbyexamples.com/pyspark/pyspark-replace-column-values/
https://stackoverflow.com/a/37038231
< df 대상으로 sql 사용 >
createOrReplaceTempView 메소드를 통하여
임시 view(table) 생성한 후, 해당 임시 테이블에 쿼리를 날릴 수 있음
df.createOrReplaceTempView("temporary_table")
spark.sql("SELECT col1, col2 FROM temporary_table where col2>3").show()
참고 :
https://jhleeeme.github.io/spark-temp-view/
https://sparkbyexamples.com/pyspark/pyspark-sql-expr-expression-function/
< SQL 의 CASE WHEN 문법 사용 >
아래처럼 when 함수를 사용하면 됨
when 함수의 첫번째 파라미터는 조건문, 두번째 파라미터는 조건문이 참일 때 넣는 값
from pyspark.sql.functions import when
df2 = df.withColumn("new_gender",
when(df.gender == "M", "Male")
.when(df.gender == "F", "Female")
.when(df.gender.isNull(), "")
.otherwise(df.gender))
참고 : https://sparkbyexamples.com/pyspark/pyspark-when-otherwise/
< column 내 null 값 필터링 >
df.where(col("dt_mvmt").isNull())
df.where(col("dt_mvmt").isNotNull())
참고 : https://stackoverflow.com/a/37262973
< dataframe 정렬 >
sort 와 orderBy 는 완벽하게 같은 방식으로 동작
df.sort("number")
df.orderBy("number")
df.sort("number", "name")
df.sort(col("number"), col("name"))
df.sort(col("number").asc(), col("name").desc())
'Spark' 카테고리의 다른 글
[Spark] SQL Built-in Functions 문서 링크 (0) | 2021.06.16 |
---|---|
[Spark] json string 값을 갖는 column 에서 json 값 추출하는 방법 + 삽질의 결과 (0) | 2021.04.29 |
[Spark] Standalone + Hadoop 설치 방법 (0) | 2021.03.26 |
[Spark] 디버깅, 로그 보는 방법 링크(영어) (0) | 2021.03.24 |
[Spark] history ui port 바꾸는 방법 (0) | 2020.08.13 |