column 내용이 json 이고 타입은 string 인 dataframe 에서
json 의 내용을 추출하고자 한다.
예를 들어 다음과 같은 dataframe 이 있다고 하자.
< pyspark > from pyspark.sql.functions import get_json_object # 테스트용 dataframe 생성 jsonDF = spark.range(1).selectExpr("""'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""") #show 를 해보면 json 값이 string 타입으로 들어가있다. jsonDF.show() +--------------------+ | jsonString| +--------------------+ |{"myJSONKey" : {"...| +--------------------+ #타입은 다음과 같이 확인할 수 있다. jsonDF.dtypes [('jsonString', 'string')] #json string 에서 값을 꺼내려면 다음과 같이 get_json_object 를 사용한다. result = jsonDF.select( get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]").alias("result") ) #결과값으로 두번째 값인 2가 나온다. result.show() +------+ |result| +------+ | 2| +------+ |
< scala > import org.apache.spark.sql.functions.get_json_object val jsonDF = spark.range(1).selectExpr("""'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""") result = jsonDF.select( get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column" ) |
만약 위에처럼 json 내부에 또 다른 json 이 있는 nested 형태가 아니라
그냥 json 이라면, json_tuple 을 사용할 수 있다.
예를 들어,
< pyspark > from pyspark.sql.functions import get_json_object, json_tuple result = jsonDF.select(json_tuple(col("jsonString"), "myJSONKey")) |
< scala > import org.apache.spark.sql.functions.json_tuple result = jsonDF.select(col("jsonString"), "myJSONKey") |
sql 을 사용할 수 있는 selectExpr 를 이용하면 다음과 같이 처리할 수 있다.
result = jsonDF.selectExpr( "get_json_object(jsonString, '$.myJSONKey.myJSONValue[1]') as column", "json_tuple(jsonString, 'myJSONKey')" ) |
하나가 아니라 여러개의 value 를 가져오고 싶다면 다음과 같이 * 을 사용하면 된다.
..... converted = inputDF\ .withColumn("areaCode", get_json_object("phones", "$[*].areaCode"))\ .withColumn("phones", explode(from_json("phones", phone_schema)))\ .withColumn("phone", col("phones.phone"))\ .drop("phones")\ .filter(~isnull("phone")) ..... |
출처 : https://stackoverflow.com/a/47944084
다른 예제 살펴보자. 아래와 같은 json 이 있다고 하자 < /eyeballs/path/sample.json > { "a" : { "b" : { "c" : "d" } } } 이걸 spark 에서 읽는 방법은 아래와 같음 val df = spark.read.option("multiline", "true").json("/eyeballs/path/sample.json") 이렇게 읽은 df 를 show 하면 이상하게 말단에 있는 값만 뜸 df.show() 결과 : {{d}} 중첩된 json 을 읽으려면 dot 으로 연결하여 보면 됨 df.select("a").show() 결과 : {{d}} df.select("a.b").show() 결과 : {d} df.select("a.b.c").show() 결과 : d 그 외 메소드 출력 결과 df.columns 결과 : Array(a) df.select("a").columns 결과 : Array(a) df.select("a.b").columns 결과 : Array(b) df.select("a.b.c").columns 결과 : Array(c) df.schema 결과 : StructType(StructField(a, StructType(StructField(b, StructType(StructField(c, StringType, true)),true)),true)) df.select("a").schema 결과 : StructType(StructField(a, StructType(StructField(b, StructType(StructField(c, StringType, true)), true)), true)) df.select("a.b").schema 결과 : StructType(StructField(b, StructType(StructField(c, StringType, true)), true)) df.select("a.b.c").schema 결과 : StructType(StructField(c,StringType, true)) |
다른 예를 살펴봄 아래와 같이 key 값이 dot 으로 연결된 json 을 읽는다고 하자 < /eyeballs/path/sample2.json > { "a.a" : "b.b" } 읽을 때 key 값을 ` 로 감싸서 읽어야 함 val df = spark.read.option("multiline", "true").json("/eyeballs/path/sample2.json") df.select("`a.a`").show() 결과 : b.b |
다른 예를 살펴봄 필터를 추가해서 내가 원하는 json 부분만 남길 수 있음 아래와 같은 json 이 있고, 값이 o 인 것만 추출한다고 하자 </eyeballs/path/sample3.json> { "a" : "x", "b" : "x", "c" : "o", "d" : "o", "e" : "x" } val df = spark.read.option("multiline","true").json("/eyeballs/path/sample3.json") val onlyO = for { column <- df.columns if( df.select(column).collect()(0)(0).toString == "o" ) } yield column 결과 : Array(c,e) |
적극 참고한 곳 : 스파크 완벽 가이드 (2018년)
아래는 내가 삽질한 결과임
나는 pyspark 를 이용하여 작업하고 있었다.
json string 은 왠지 dict 로 변경해야 할 것 같았고,
책에서 python 의 dict 는 spark 의 MapType 으로 대응된다고 해서
json string 값도 역시 MapType 으로 변경해야하는구나 싶었다.
rdd 의 map 을 이용하여 json.loads 메소드를 사용해보려고도 했다.
구글링 할 때 how to json string to map type 등으로 검색하곤 했는데...
내가 원하는 결과는 죽어도 안 나오고,
그나마 몇 개 찾은 문서를 따라 진행해보면 자꾸 type error 를 뿜어냈다.
나는 pyspark 가 고장난 줄 알았다 ㅇㅇ;;
결과적으로 내가 만든 output 은 아래와 같은 것이었다.
from pyspark.sql.functions import from_json, col json_schema = spark.read.json(jsonDF.rdd.map(lambda x:x[0])).schema jsonDF = jsonDF.withColumn("json", from_json(col("jsonString"), json_schema)) jsonDF.show() +--------------------+-------------+ | jsonString| json| +--------------------+-------------+ |{"myJSONKey" : {"...|[[[1, 2, 3]]]| +--------------------+-------------+ jsonDF.dtypes [('jsonString', 'string'), ('json', 'struct<myJSONKey:struct<myJSONValue:array<bigint>>>')] |
저기 뒤에 있는 기나긴 stuctType 타입이 보이는가...?
난 도데체 뭘 만든거지...?
저 상태에서 계산 등에 이용하려고 시도하면
type 이 맞질 않는다는 에러가 난다.
사용하려고 하면 다음과 같이 사용할 수 있다.
value = jsonDF.selectExpr("json['myJSONKey']") value.show() +-------------+ |json.myJSONKey| +-------------+ |[[[1, 2, 3]]]| +-------------+ |
책에서는 저렇게 흉측한 stuctType 을 갖는 json 을 문자열(즉 맨 위에서 말하는 json string)로 되돌릴 수 있다고 한다.
어떻게? to_json 함수를 이용해서.
< pyspark > from pyspark.sql.functions import to_json result = jsonDF.select(to_json(col("json"))) |
< scala > import org.apache.spark.sql.functions.to_json result = jsonDF.select(to_json(col("json"))) |
하루동안 삽질을 했더니 팔이 너무 아프다.
참고했던 곳
sparkbyexamples.com/pyspark/pyspark-json-functions-with-examples/
stackoverflow.com/questions/40957585
stackoverflow.com/questions/49675860
velog.io/@crescent702/RDD-map-filter-외부-모듈-함수-사용하기-in-pyspark
'Spark' 카테고리의 다른 글
[PySpark] 여러 path 에서 데이터 읽는 방법 (0) | 2021.06.16 |
---|---|
[Spark] SQL Built-in Functions 문서 링크 (0) | 2021.06.16 |
[PySpark] Python DataFrame 다양한 연산 모음 (0) | 2021.04.05 |
[Spark] Standalone + Hadoop 설치 방법 (0) | 2021.03.26 |
[Spark] 디버깅, 로그 보는 방법 링크(영어) (0) | 2021.03.24 |