아래 사이트를 적극 참고한다.

https://docs.mongodb.com/spark-connector/master/scala-api/

 

Spark Connector Scala Guide — MongoDB Spark Connector v2.3

Spark Connector Scala Guide Source Code For the source code that contains the examples below, see Introduction.scala. Prerequisites Basic working knowledge of MongoDB and Apache Spark. Refer to the MongoDB documentation and Spark documentation for more det

docs.mongodb.com

 

1. 일단 spark-shell 이 구동될 수 있는 환경을 조성한다(환경을 어떻게 조성하는지는 여기 글에 쓰지 않음).

 

 

 

2. mongo db에 접속할 수 있도록 url 를 만들어둔다(url 을 어떻게 만드는지 여기 글에 쓰지 않음).

 

 

 

3. spark-shell 을 아래 명령어를 통해 접속한다.

 

./bin/spark-shell --master yarn \
                  --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection" \
                  --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection" \
                  --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0

 

위의 127.0.0.1 은 mongo db의 url 로 대체하고,

test.myCollection 은 mongodb 내의 사용하길 원하는 데이터가 있는 [database 이름].[collection 이름] 으로 대체한다.

 

--conf 를 이용하여 여러가지 옵션을 줄 수 있다.

옵션 처리는 아래 링크 참고

https://docs.mongodb.com/spark-connector/master/configuration/

 

Configuration Options — MongoDB Spark Connector v2.3

Configuration Options Various configuration options are available for the MongoDB Spark Connector. Specify Configuration Via SparkConf You can specify these options via SparkConf using the --conf setting or the $SPARK_HOME/conf/spark-default.conf file, and

docs.mongodb.com

 

 

 

 

sbt 를 이용하여 연동할 때 (spark-submit)

< build.sbt >

 

version := "1.0"

scalaVersion := "2.11.12"

 

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"

libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.0"

 

artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>

        "io_mongo.jar"

}

 

위의 dependency 를 넣어주면 된다.

 

 

 

4. 인터넷이 연결되어 있다는 가정 하에, mongo-spark-connector 가 알아서 받아진다.

 

 

 

5. 아래 명령어로 com.mongodb.spark._ 를 import 한다.

 

import com.mongodb.spark._

 

 

 

6. 아래 명령어로 count가 잘 되는지 확인한다. 잘 된다면 mongodb랑 잘 연동되었다는 의미.

 

 

val rdd = MongoSpark.load(sc)
println(rdd.count)

 

 

 

 

 

 

7. sbt (spark-submit) 을 이용한 코드 예제는 아래와 같다.

실제 Spark 코드상에서 아래처럼 mongoDB 의 데이터를 가져올 수 있다.

 

import com.mongodb.spark._

import org.bson.Document

import java.util.ArrayList

import scala.collection.JavaConversions._

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

 

object IO_Mongo {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("mongo-spark integration test")

      val sc = new SparkContext(conf)

      val rdd = MongoSpark.load(sc)

      //여기서 rdd 는 com.mongodb.spark.rdd.MongoRDD[org.bson.Document] 

      val df = rdd.toDF()

      df.show()

  }

}

 

 

mongodb에서 읽고, mongodb에 쓰는 방법은 가장 위의 링크를 참고한다.

 

 

 

 

 

 

 

spark-mongo 에서 읽기, 쓰기 mongo uri 를 정하는 법

vi $SPARK_HOME/conf/spark-defaults.conf

spark.mongodb.input.uri=mongodb://[mongo router host ip]:[mongo router host port]/[database name].[collection name]

spark.mongodb.output.uri=mongodb://[mongo router host ip]:[mongo router host port]/[database name].[collection name]


참고

 

 

spark-mongo 에서 캐시를 얼마나 오랫동안 유지할 지 정하는 법

vi $SPARK_HOME/conf/spark-defaults.conf

spark.mongodb.keep_alive_ms      0

0을 넣으면 0ms 동안 캐시를 유지한다.

기본은 5000ms.


참고

 

spark-mongo 에서 readPreference, partitioner 바꾸는 법

vi $SPARK_HOME/conf/spark-defaults.conf

spark.mongodb.input.readPreference.name nearest

spark.mongodb.input.partitioner MongoShardedPartitioner


참고

 

local threshold 바꾸는 법

vi $SPARK_HOME/conf/spark-defaults.conf

spark.mongodb.input.localThreshold 15

기본은 15ms.


참고

 

 

 

 

참고할 만한 곳

https://www.mongodb.com/blog/post/the-new-mongodb-connector-for-apache-spark-in-action-building-a-movie-recommendation-engine

https://docs.databricks.com/_static/notebooks/mongodb.html

https://www.slideshare.net/mongodb/how-to-connect-spark-to-your-own-datasource

 

+ Recent posts