< scala map >
scala> val myList = List("abc", "bcd", "cde")
scala> val reverseList = myList.map((s:String) => {s.reverse})
scala> for(l <- reverseList){println(l)}
결과
cba
dcb
edc
< scala reduce >
scala> val numberList = List(1,2,3,4,5)
scala> val sum = numberList.reduce( (acc:Int, newNum:Int) => acc + newNum )
reduce 는 List 내 모든 원소에 대해 적용됨
reduce 에 들어가는 acc 는 acc+newNum 의 값이 되고
newNum 은 numberList 에서 새로 들어오는 값이 됨
차근차근 살펴보면 다음과 같음
acc : 1, newNum : 2. 결과 acc+newNum = 3 이것이 다시 acc 으로 들어감
acc : 3, newNum : 3. 결과 acc+newNum = 6 이것이 다시 acc 으로 들어감
acc : 6, newNum : 4. 결과 acc+newNum = 10 이것이 다시 acc 으로 들어감
acc : 10, newNum : 5. 결과 acc+newNum = 15
numberList 내의 모든 원소에 대해 전부 적용되었으니 마지막에 반환되는 값은 15
따라서 결과는 15
근데 아래처럼 String concat 으로 바꾸면 위의 논리와 맞지 않는 결과가 생김
scala> val ml = List("a","b","c")
scala> ml.reduce((x:String, y:String) => x+y)
결과 abc ????
reduce 를 어떻게 이해해야하지...??
추가) map 혹은 reduce 는 scala 자체가 갖고 있는 method 이기도 하지만
spark 가 갖고 있는 method 이기도 함
둘 다 동일하게 동작하지만 분산 처리 할 때는 spark 의 map, reduce 를 사용해야 함
< scala filter >
scala> val filterFives = List(1,2,5,3,4,5,6).filter((x:Int) => x!=5)
scala> val filterFives = List(1,2,5,3,4,5,6).filter( _!=5 )
filter 내부에는 boolean 값이 들어가나 봄
boolean 값이 true 인 원소만 남는 것 같음
또한 underline ( ' _ ' ) 을 사용하여 리스트 내의 모든 원소를 대신 표현 가능함
결과
List(1, 2, 3, 4, 6)
< scala List 연결(concat) >
++ 를 사용
scala> val concat = List(1,2,3)++List(4,5,6)
결과
List(1, 2, 3, 4, 5, 6)
< scala List 여러 함수 >
scala> List(1,2,3,4).reverse
결과 List(4, 3, 2, 1)
scala> List(3,2,4,1).sorted
결과 List(1, 2, 3, 4)
scala> List(3,2,2,3,2,1).distinct
결과 List(3, 2, 1)
scala> List(1,2,3).max
결과 Int = 3
scala> List(1,2,3).sum
결과 Int = 6
scala> List(1,2,3).contains(2)
결과 Boolean = true
< 자바에서의 hashMap >
스칼라의 Map 은 파이썬의 dict, 자바의 hashMap
scala> val myMap = Map("A" -> "a", "B" -> "b", "C" -> "c", "D" -> "d")
scala> myMap("C")
결과 String = c
scala> myMap.contains("E")
결과 Boolean = false
scala> util.Try(myMap("E")) getOrElse "Unknown"
결과 String = Unknown
< Spark Session builder >
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("EyeballsTest").master("local[*]").config("spark.some.config.option", "some-value").getOrCreate()
https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession
< csv 파일 읽기 >
import spark.implicits._
val myCsv = spark.read.option("header", "true").option("inferSchema", "true").csv("a/b/c.csv")
< tsv 파일 읽기 >
import spark.implicits._
val myCsv = spark.read.option("header", "true").option("sep","\t").option("inferSchema", "true").csv("a/b/t.tsv")
< txt 파일 읽기 >
txt 파일을 읽은 후 column 의 이름은 "value" 로 고정되어 사용됨
import spark.implicits._
val myTxt = spark.read.txt("a/b/d.txt")
myTxt.select("value").show(3)
< dataframe 에 sql 쿼리 적용 >
sql 쿼리 적용을 위해 dataframe 에 대한 tempview 를 만들어야 함
myCsv.createOrReplaceTempView("tempTable")
val fives = spark.sql("SELECT * FROM tempTable WHERE key==5")
< select >
people.select(people("name"), people("age") + 10).show(3)
< filter >
people.filter(people("age") < 21).show(3)
< sort >
people.sort("age").show(3)
people.sort($"age".desc).show(3)
< groupBy >
people.groupBy("age").avg("friends").show(3)
people.groupBy("age").agg(round(avg("friends"), 2)).show(3)
people.groupBy("age").agg(round(avg("friends"), 2).alias("rounds")).sort("age").show(3)
agg 함수를 통해 groupBy 이후 집계 처리가 가능
temps.groupBy("station").min("temperature").show()
groupBy 로 묶은 값 중 가장 작은 값(혹은 가장 큰 값)을 집계 가능
< show 로 전체 컬럼 출력하기 >
show 에 입력되는 파라미터를 column 의 개수로 주면 됨
예를 들어
people.show(people.count.toInt)
< explode and split >
split 을 통해 각 column 의 값을 정규식으로 분할
split 을 통해 분할된 각 값을 (explode 를 통해) 각 행(row) 으로 만들고 그 이름을 word 라고 칭함
people.select(
explode(
split("value", "\\W+")
).alias("word")
).filter("word" =!= "")
.show(3)
< udf 대상으로 broadcast 변수 사용하기 >
//broadcast 변수에 넣을 함수 선언
def loadMovieNames() : Map[Int, String] = {
...
}
//위에서 선언한 함수를 broadcat 변수에 넣음
val nameDict = spark.sparkContext.broadcast(loadMovieNames())
//익명함수 선언
val lookupName : Int => String = (movieID:Int) => {
nameDict.value(movieID)
}
//위에서 선언한 익명함수를 udf 로 래핑
import org.apache.spark.sql.functions.udf
val lookupNameUDF = udf(lookupName)
//udf 의 결과값을 column 에 추가
import org.apache.spark.sql.functions.col
movieContents.withColumn("movieTitle", lookupNameUDF( col("movieID") ))
spark 의 각 executor 에 nameDict 를 배포했기 때문에
바로 위 withColumn 의 lookupNameUDF 를 실행할 때
각 executor 가 로컬로 갖고 있는 nameDict 함수를 그대로 사용 가능
< self join >
아래 코드에서 ratings 는 DataSet
as 를 사용하여 join 시 별명을 붙여 알아보기 쉽게 join 가능
val joinResult = ratings.as("ratings1")
.join( ratings.as("ratings2"), $"ratings1.userId" === $"ratings2.userId" )
.select(
$"ratings1.movieId".alias("movie"),
$"ratings2.userId",alias("user")
)
< SparkSession 닫기 >
spark.stop()
'Spark' 카테고리의 다른 글
[PySpark] sample dataframe 만들기 (0) | 2022.05.09 |
---|---|
[PySpark] 컬럼의 합 구하는 방법 (0) | 2021.06.16 |
[PySpark] 여러 path 에서 데이터 읽는 방법 (0) | 2021.06.16 |
[Spark] SQL Built-in Functions 문서 링크 (0) | 2021.06.16 |
[Spark] json string 값을 갖는 column 에서 json 값 추출하는 방법 + 삽질의 결과 (0) | 2021.04.29 |