column 내용이 json 이고 타입은 string 인 dataframe 에서

json 의 내용을 추출하고자 한다.

 

예를 들어 다음과 같은 dataframe 이 있다고 하자.

 


< pyspark >

from pyspark.sql.functions import get_json_object


# 테스트용 dataframe 생성
jsonDF = spark.range(1).selectExpr("""'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")

#show 를 해보면 json 값이 string 타입으로 들어가있다.
jsonDF.show()
+--------------------+
|          jsonString|
+--------------------+
|{"myJSONKey" : {"...|
+--------------------+

#타입은 다음과 같이 확인할 수 있다.
jsonDF.dtypes
[('jsonString', 'string')]

#json string 에서 값을 꺼내려면 다음과 같이 get_json_object 를 사용한다.
result = jsonDF.select(
    get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]").alias("result")
)

#결과값으로 두번째 값인 2가 나온다.
result.show()
+------+
|result|
+------+
|     2|
+------+


< scala >

import org.apache.spark.sql.functions.get_json_object

val jsonDF = spark.range(1).selectExpr("""'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")
result = jsonDF.select(
    get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column"
)

 

만약 위에처럼 json 내부에 또 다른 json 이 있는 nested 형태가 아니라

그냥 json 이라면, json_tuple 을 사용할 수 있다.

예를 들어,

 


< pyspark >

from pyspark.sql.functions import get_json_object, json_tuple
result = jsonDF.select(json_tuple(col("jsonString"), "myJSONKey"))


< scala >

import org.apache.spark.sql.functions.json_tuple
result = jsonDF.select(col("jsonString"), "myJSONKey")

 

sql 을 사용할 수 있는 selectExpr 를 이용하면 다음과 같이 처리할 수 있다.

 


result = jsonDF.selectExpr(

    "get_json_object(jsonString, '$.myJSONKey.myJSONValue[1]') as column",
    "json_tuple(jsonString, 'myJSONKey')"
)

 

하나가 아니라 여러개의 value 를 가져오고 싶다면 다음과 같이 * 을 사용하면 된다.

 


.....
converted = inputDF\
  .withColumn("areaCode", get_json_object("phones", "$[*].areaCode"))\
  .withColumn("phones", explode(from_json("phones", phone_schema)))\
  .withColumn("phone", col("phones.phone"))\
  .drop("phones")\
  .filter(~isnull("phone"))
.....

출처 : https://stackoverflow.com/a/47944084

 

 

 

 

 


다른 예제 살펴보자.
아래와 같은 json 이 있다고 하자

< /eyeballs/path/sample.json >
{
  "a" : {
    "b" : {
      "c" : "d"
    }
  }
}

이걸 spark 에서 읽는 방법은 아래와 같음

val df = spark.read.option("multiline", "true").json("/eyeballs/path/sample.json")

이렇게 읽은 df 를 show 하면 이상하게 말단에 있는 값만 뜸

df.show()
결과 : {{d}}

중첩된 json 을 읽으려면 dot 으로 연결하여 보면 됨

df.select("a").show()
결과 : {{d}}

df.select("a.b").show()
결과 : {d}

df.select("a.b.c").show()
결과 : d

그 외 메소드 출력 결과

df.columns
결과 : Array(a)

df.select("a").columns
결과 : Array(a)

df.select("a.b").columns
결과 : Array(b)

df.select("a.b.c").columns
결과 : Array(c)

df.schema
결과 : StructType(StructField(a, StructType(StructField(b, StructType(StructField(c, StringType, true)),true)),true))

df.select("a").schema
결과 : StructType(StructField(a, StructType(StructField(b, StructType(StructField(c, StringType, true)), true)), true))

df.select("a.b").schema
결과 : StructType(StructField(b, StructType(StructField(c, StringType, true)), true))

df.select("a.b.c").schema
결과 : StructType(StructField(c,StringType, true))


 


다른 예를 살펴봄
아래와 같이 key 값이 dot 으로 연결된 json 을 읽는다고 하자

< /eyeballs/path/sample2.json >
{
  "a.a" : "b.b"
}

읽을 때 key 값을 ` 로 감싸서 읽어야 함

val df = spark.read.option("multiline", "true").json("/eyeballs/path/sample2.json")
df.select("`a.a`").show()
결과 : b.b

 

 

 


다른 예를 살펴봄
필터를 추가해서 내가 원하는 json 부분만 남길 수 있음

아래와 같은 json 이 있고, 값이 o 인 것만 추출한다고 하자

</eyeballs/path/sample3.json>
{
  "a" : "x",
  "b" : "x",
  "c" : "o",
  "d" : "o",
  "e" : "x"
}

val df = spark.read.option("multiline","true").json("/eyeballs/path/sample3.json")
val onlyO = for {
  column <- df.columns
  if( df.select(column).collect()(0)(0).toString == "o" )
} yield column

결과 : Array(c,e)


 

 

 

 

적극 참고한 곳 : 스파크 완벽 가이드 (2018년)

 

 

아래는 내가 삽질한 결과임

 

나는 pyspark 를 이용하여 작업하고 있었다.

json string 은 왠지 dict 로 변경해야 할 것 같았고,

책에서 python 의 dict 는 spark 의 MapType 으로 대응된다고 해서

json string 값도 역시 MapType 으로 변경해야하는구나 싶었다.

rdd 의 map 을 이용하여 json.loads 메소드를 사용해보려고도 했다.

구글링 할 때 how to json string to map type 등으로 검색하곤 했는데...

 

내가 원하는 결과는 죽어도 안 나오고,

그나마 몇 개 찾은 문서를 따라 진행해보면 자꾸 type error 를 뿜어냈다.

나는 pyspark 가 고장난 줄 알았다 ㅇㅇ;;

 

결과적으로 내가 만든 output 은 아래와 같은 것이었다.

 


from pyspark.sql.functions import from_json, col


json_schema = spark.read.json(jsonDF.rdd.map(lambda x:x[0])).schema
jsonDF = jsonDF.withColumn("json", from_json(col("jsonString"), json_schema))
jsonDF.show()
+--------------------+-------------+
|          jsonString|         json|
+--------------------+-------------+
|{"myJSONKey" : {"...|[[[1, 2, 3]]]|
+--------------------+-------------+



jsonDF.dtypes
[('jsonString', 'string'), ('json', 'struct<myJSONKey:struct<myJSONValue:array<bigint>>>')]


 

저기 뒤에 있는 기나긴 stuctType 타입이 보이는가...?

난 도데체 뭘 만든거지...?

저 상태에서 계산 등에 이용하려고 시도하면 

type 이 맞질 않는다는 에러가 난다.

 

사용하려고 하면 다음과 같이 사용할 수 있다.

 


value = jsonDF.selectExpr("json['myJSONKey']")
value.show()
+-------------+
|json.myJSONKey|
+-------------+
|[[[1, 2, 3]]]|
+-------------+

 

책에서는 저렇게 흉측한 stuctType 을 갖는 json 을 문자열(즉 맨 위에서 말하는 json string)로 되돌릴 수 있다고 한다.

어떻게? to_json 함수를 이용해서.

 


< pyspark >

from pyspark.sql.functions import to_json

result = jsonDF.select(to_json(col("json")))


< scala >

import org.apache.spark.sql.functions.to_json

result = jsonDF.select(to_json(col("json")))

 

 

하루동안 삽질을 했더니 팔이 너무 아프다.

 

 

 

 

 

 

참고했던 곳

sparkbyexamples.com/pyspark/pyspark-json-functions-with-examples/

 

stackoverflow.com/a/45880574

stackoverflow.com/a/45039678

 

stackoverflow.com/questions/40957585

stackoverflow.com/questions/49675860

kontext.tech/column/spark/284/pyspark-convert-json-string-column-to-array-of-object-structtype-in-data-frame

sparkbyexamples.com/pyspark/different-ways-to-create-dataframe-in-pyspark/#sparksession-createdataframe

velog.io/@crescent702/RDD-map-filter-외부-모듈-함수-사용하기-in-pyspark

 

 

 

 

 

 

+ Recent posts