< scala map >

 

scala> val myList = List("abc", "bcd", "cde")
scala> val reverseList = myList.map((s:String) => {s.reverse})
scala> for(l <- reverseList){println(l)}

 

결과

 

cba
dcb
edc

 

 

 

 

< scala reduce >

 

scala> val numberList = List(1,2,3,4,5)

scala> val sum = numberList.reduce( (acc:Int, newNum:Int) => acc + newNum )

 

reduce 는 List 내 모든 원소에 대해 적용

reduce 에 들어가는 acc 는 acc+newNum 의 값이 되고

newNum 은 numberList 에서 새로 들어오는 값이 됨

 

차근차근 살펴보면 다음과 같음

acc : 1, newNum : 2. 결과 acc+newNum = 3 이것이 다시 acc 으로 들어감

acc : 3, newNum : 3. 결과 acc+newNum = 6 이것이 다시 acc 으로 들어감

acc : 6, newNum : 4. 결과 acc+newNum = 10 이것이 다시 acc 으로 들어감

acc : 10, newNum : 5. 결과 acc+newNum = 15

 

numberList 내의 모든 원소에 대해 전부 적용되었으니 마지막에 반환되는 값은 15

따라서 결과는 15

 

근데 아래처럼 String concat 으로 바꾸면 위의 논리와 맞지 않는 결과가 생김

scala> val ml = List("a","b","c")
scala> ml.reduce((x:String, y:String) => x+y)
결과 abc ????

 

reduce 를 어떻게 이해해야하지...??

 

추가) map 혹은 reduce 는 scala 자체가 갖고 있는 method 이기도 하지만

spark 가 갖고 있는 method 이기도 함

둘 다 동일하게 동작하지만 분산 처리 할 때는 spark 의 map, reduce 를 사용해야 함

 

 

 

< scala filter >

 

scala> val filterFives = List(1,2,5,3,4,5,6).filter((x:Int) => x!=5)

scala> val filterFives = List(1,2,5,3,4,5,6).filter( _!=5 )

 

filter 내부에는 boolean 값이 들어가나 봄

boolean 값이 true 인 원소만 남는 것 같음

 

또한 underline ( ' _ ' ) 을 사용하여 리스트 내의 모든 원소를 대신 표현 가능함

 

결과

List(1, 2, 3, 4, 6)

 

 

 

 

< scala List 연결(concat) >

 

++ 를 사용

 

scala> val concat = List(1,2,3)++List(4,5,6)

 

결과

List(1, 2, 3, 4, 5, 6)

 

 

 

 

< scala List 여러 함수 >

 

scala> List(1,2,3,4).reverse
결과 List(4, 3, 2, 1)

scala> List(3,2,4,1).sorted
결과 List(1, 2, 3, 4)

scala> List(3,2,2,3,2,1).distinct

결과 List(3, 2, 1)

 

scala> List(1,2,3).max
결과 Int = 3

scala> List(1,2,3).sum
결과 Int = 6

scala> List(1,2,3).contains(2)
결과 Boolean = true

 

 

 

 

 

 

< 자바에서의 hashMap >

 

스칼라의 Map 은 파이썬의 dict, 자바의 hashMap

 

scala> val myMap = Map("A" -> "a", "B" -> "b", "C" -> "c", "D" -> "d")
scala> myMap("C")

 

결과 String = c

 

scala> myMap.contains("E")
결과 Boolean = false

 

scala> util.Try(myMap("E")) getOrElse "Unknown"
결과 String = Unknown

 

 

 

 

 

< Spark Session builder >

 

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("EyeballsTest").master("local[*]").config("spark.some.config.option", "some-value").getOrCreate()

https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession
 

 

 

 

< csv 파일 읽기 >

 

import spark.implicits._

val myCsv = spark.read.option("header", "true").option("inferSchema", "true").csv("a/b/c.csv")

 

 

< tsv 파일 읽기 >

 

import spark.implicits._

val myCsv = spark.read.option("header", "true").option("sep","\t").option("inferSchema", "true").csv("a/b/t.tsv")

 

 

< txt 파일 읽기 >

 

txt 파일을 읽은 후 column 의 이름은 "value" 로 고정되어 사용됨

import spark.implicits._

val myTxt = spark.read.txt("a/b/d.txt")

myTxt.select("value").show(3)

 

 

 

 

 

 

< dataframe 에 sql 쿼리 적용 >

 

sql 쿼리 적용을 위해 dataframe 에 대한 tempview 를 만들어야 함

 

myCsv.createOrReplaceTempView("tempTable")

val fives = spark.sql("SELECT * FROM tempTable WHERE key==5")

 

 

 

 

< select >

 

people.select(people("name"), people("age") + 10).show(3)

 

 

 

 

< filter >

 

people.filter(people("age") < 21).show(3)

 

 

 

 

< sort >

 

people.sort("age").show(3)

people.sort($"age".desc).show(3)

 

 

 

 

< groupBy >

 

people.groupBy("age").avg("friends").show(3)

people.groupBy("age").agg(round(avg("friends"), 2)).show(3)

people.groupBy("age").agg(round(avg("friends"), 2).alias("rounds")).sort("age").show(3)

 

agg 함수를 통해 groupBy 이후 집계 처리가 가능

 

temps.groupBy("station").min("temperature").show()

 

groupBy 로 묶은 값 중 가장 작은 값(혹은 가장 큰 값)을 집계 가능

 

 

 

 

 

 

< show 로 전체 컬럼 출력하기 >

 

show 에 입력되는 파라미터를 column 의 개수로 주면 됨

예를 들어

people.show(people.count.toInt)

 

 

 

 

 

 

 

< explode and split >

 

split 을 통해 각 column 의 값을 정규식으로 분할

split 을 통해 분할된 각 값을 (explode 를 통해) 각 행(row) 으로 만들고 그 이름을 word 라고 칭함

 

people.select(

    explode(

        split("value", "\\W+")

    ).alias("word")

).filter("word" =!= "")

.show(3)

 

 

 

 

 

< udf 대상으로 broadcast 변수 사용하기 >

 

//broadcast 변수에 넣을 함수 선언

def loadMovieNames() : Map[Int, String] = {

...

}

 

//위에서 선언한 함수를 broadcat 변수에 넣음

val nameDict = spark.sparkContext.broadcast(loadMovieNames())

 

//익명함수 선언

val lookupName : Int => String = (movieID:Int) => {

    nameDict.value(movieID)

}

 

//위에서 선언한 익명함수를 udf 로 래핑

import org.apache.spark.sql.functions.udf

val lookupNameUDF = udf(lookupName)

 

//udf 의 결과값을 column 에 추가

import org.apache.spark.sql.functions.col

movieContents.withColumn("movieTitle", lookupNameUDF( col("movieID") ))

 

spark 의 각 executor 에 nameDict 를 배포했기 때문에

바로 위 withColumn 의 lookupNameUDF 를 실행할 때

각 executor 가 로컬로 갖고 있는 nameDict 함수를 그대로 사용 가능

 

 

 

< self join >

 

아래 코드에서 ratings 는 DataSet

as 를 사용하여 join 시 별명을 붙여 알아보기 쉽게 join 가능

 

val joinResult = ratings.as("ratings1")

.join( ratings.as("ratings2"), $"ratings1.userId" === $"ratings2.userId" )

.select(

  $"ratings1.movieId".alias("movie"),

  $"ratings2.userId",alias("user")

)

 

 

 

 

 

 

< SparkSession 닫기 >

 

spark.stop()

 

 

 

 

 

 

 

 

+ Recent posts