Spark cluster 가 AWS Cloud 상에 있지 않고 내 Local Server 위에서 동작하고 있는 환경에서

AWS S3 에서 데이터를 읽어오는 방법에 대해 설명한다.

 

Hadoop 이 설치되어 있어야 한다. Hadoop 에서 제공하는 s3aFileSystem 을 사용하기 때문이다.

dependency 문제를 벗어나기 위해 되도록 최신 버전을 사용한다.

 

Ubuntu 위에서 진행하였다.

 

 

 

 

 

1.

AWS Access Key 와 Secret Key 를 생성한다.

아래를 참고한다.

http://pyrasis.com/book/TheArtOfAmazonWebServices/Chapter09

 

아마존 웹 서비스를 다루는 기술 9장 - API와 툴 사용을 위한 액세스 키 생성하기

 

pyrasis.com

 

 

 

 

 

 

2.

s3 버킷을 만들고 데이터를 넣는다.

나의 경우 

[a b c] 가 있는 eye.txt 파일을 넣었다.

 

 

 

 

 

 

 

 

3.

올린 파일을 클릭하면 아래와 같이 뜨는데, 여기 경로 복사를 눌러 해당 파일로의 경로를 클립보드에 복사해둔다.

s3://..../eye.txt 같은 모양.

 

 

 

 

 

4.

spark project 를 위한 folder 를 하나 만든다. 위치는 어디든 상관 없음.

그 directory 내에 S3.scala 라는 이름으로 scala 코드를 하나 만들고 아래 코드를 넣는다.

아래 bold 해 둔 곳에 자신의 1번에서 만든 access key, secret key와 3번에서 복사한 파일 경로를 넣는다.

파일의 경로는 s3://... 모양이지만

a를 붙여서 s3a://... 로 시작하도록 한다.

 

< S3.scala >

 

import org.apache.spark.sql.SparkSession

object S3{

  def main(args: Array[String]) {

    val spark = SparkSession.builder.appName("Spark with S3").getOrCreate()

    import spark.sqlContext.implicits._

    spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "[1번에서 만든 access key]")

    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "[1번에서 만든 secret key]")

    val text = spark.sparkContext.textFile("s3a://.../eye.txt")

    val result = text.take(1)

    println(">>> " + result(0))

  }

}

 

 

 

 

 

 

5.

build.sbt 파일을 하나 만들고 아래 내용을 넣는다.

scalaVersion 은 자신의 컴퓨터에 깔려있는 버전을 사용하면 된다.

 

< build.sbt >

 

 

version := "1.0"

scalaVersion := "2.11.12"

 

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"

 

artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>

        "spark_s3.jar"

}

 

 

 

 

 

 

6.

lib 라는 이름의 folder 를 하나 만든다.

그 안에 아래 jar 파일들을 모두 받는다.

가장 아래 hadoop-aws jar 는 자신의 hadoop 버전에 맞는 것으로 받는다.

 

wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.11.695/aws-java-sdk-1.11.695.jar

wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.695/aws-java-sdk-s3-1.11.695.jar

wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.695/aws-java-sdk-core-1.11.695.jar

wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.695/aws-java-sdk-dynamodb-1.11.695.jar 

wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.1/hadoop-aws-3.1.1.jar

 

 

dependency 얻는 곳 :

https://mvnrepository.com/artifact/com.amazonaws

https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws

 

 

 

 

 

7.

S3.scala 와 build.sbt, 그리고 lib folder 를 만들고 나면 아래처럼 보일 것이다.

 

 

아래 명령어를 통해 dependencies 를 받고 컴파일한다.

 

sbt package 

 

sbt 설치 방법은 여기 참고.

 

[success] 가 뜨면 성공

 

 

 

 

 

8.

./target/scala-2.11 로 이동한다.

compile 된 jar 가 보인다.

 

아래 spark-submit 명령으로 실행하여 take(1) 의 결과를 확인해본다.

 

spark-submit --class S3 --master yarn --jars "/root/code/spark_s3/lib/*" spark_s3.jar

 

위의 jars 의 path 는 6번에서 만든 lib 의 path 이다.

 

결과가 잘 뜬다.

 

 

 

 

 

 

 

 

 

spark-shell 에서 돌려보면 read 했을 때 Rdd 형태로 가져오는 것을 볼 수 있다.

 

 

 

 

 

 

 

 

 

 

 

s3 에서 가져온 데이터를 wordcount 하는 코드는 아래와 같다.

먼저 처음 가져온 RDD 상태 그대로 word count 하는 코드

 

...

val text = spark.sparkContext.textFile("s3a://...../eye.txt")
val counts = text.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

...

 

 

 

RDD 를 dataframes 로 바꿔서 word count 하는 코드

 

...

val text = spark.sparkContext.textFile("s3a://.../eye.txt").toDF()
val counts = text.as[String].flatMap(_.split(" ")).groupBy("value").count()

...

 

 

bucket 내의 모든 파일들을 읽으려면 아래처럼 wild card 를 사용한다.

예를 들어 bucket 이름이 mybucket 이고 그 안의 여러 text 파일들이 있다고 할 때 아래처럼 코딩하면 된다.

 

val text = spark.sparkContext.textFile("s3a://mybucket/*")

 

 

 

 

 

 

 

 

 

 

 

 

참고

 

s3, s3n, s3a 차이 : https://swalloow.github.io/aws-emr-s3-spark

 

성능 튜닝 : https://aajisaka.github.io/hadoop-document/hadoop-project/hadoop-aws/tools/hadoop-aws/performance.html

 

각종 옵션 넣는 방법 : https://www.slideshare.net/ssuserca76a5/amazon-s3-best-practice-and-tuning-for-hadoopspark-in-the-cloud

 

참고했던 곳들 : 

 

https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/spark_s3.html

 

https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/introduction.html

 

https://yahwang.github.io/posts/84

 

https://markobigdata.com/2017/04/23/manipulating-files-from-s3-with-apache-spark/

 

https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark

http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/

 

https://supergloo.com/spark/apache-spark-amazon-s3/

 

https://qkqhxla1.tistory.com/993

 

https://stackoverflow.com/questions/47652281/trying-to-read-and-write-parquet-files-from-s3-with-local-spark

 

https://cnpnote.tistory.com/entry/HADOOP-Spark-%ED%81%B4%EB%9F%AC%EC%8A%A4%ED%84%B0%EC%9D%98-S3%EC%97%90%EC%84%9C-%EC%9D%BD%EA%B8%B0-%EC%9E%91%EC%97%85%EC%9D%84-%EC%88%98%ED%96%89%ED%95%98%EB%A9%B4-IllegalAccessError%EA%B0%80-%EB%B0%9C%EC%83%9D%ED%95%A9%EB%8B%88%EB%8B%A4-MutableCounterLong-duplicate-%EB%A9%94%EC%86%8C%EB%93%9C%EC%97%90-%EC%95%A1%EC%84%B8%EC%8A%A4%ED%95%98%EB%A0%A4%EA%B3%A0%ED%96%88%EC%8A%B5%EB%8B%88%EB%8B%A4

 

 

 

+ Recent posts