아래 사이트를 적극 참고한다.
https://docs.mongodb.com/spark-connector/master/scala-api/
Spark Connector Scala Guide — MongoDB Spark Connector v2.3
Spark Connector Scala Guide Source Code For the source code that contains the examples below, see Introduction.scala. Prerequisites Basic working knowledge of MongoDB and Apache Spark. Refer to the MongoDB documentation and Spark documentation for more det
docs.mongodb.com
1. 일단 spark-shell 이 구동될 수 있는 환경을 조성한다(환경을 어떻게 조성하는지는 여기 글에 쓰지 않음).
2. mongo db에 접속할 수 있도록 url 를 만들어둔다(url 을 어떻게 만드는지 여기 글에 쓰지 않음).
3. spark-shell 을 아래 명령어를 통해 접속한다.
| ./bin/spark-shell --master yarn \ --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection" \ --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection" \ --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 |
위의 127.0.0.1 은 mongo db의 url 로 대체하고,
test.myCollection 은 mongodb 내의 사용하길 원하는 데이터가 있는 [database 이름].[collection 이름] 으로 대체한다.
--conf 를 이용하여 여러가지 옵션을 줄 수 있다.
옵션 처리는 아래 링크 참고
https://docs.mongodb.com/spark-connector/master/configuration/
Configuration Options — MongoDB Spark Connector v2.3
Configuration Options Various configuration options are available for the MongoDB Spark Connector. Specify Configuration Via SparkConf You can specify these options via SparkConf using the --conf setting or the $SPARK_HOME/conf/spark-default.conf file, and
docs.mongodb.com
sbt 를 이용하여 연동할 때 (spark-submit)
|
< build.sbt >
version := "1.0" scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0" libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.0"
artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) => "io_mongo.jar" }
|
위의 dependency 를 넣어주면 된다.
4. 인터넷이 연결되어 있다는 가정 하에, mongo-spark-connector 가 알아서 받아진다.
5. 아래 명령어로 com.mongodb.spark._ 를 import 한다.
| import com.mongodb.spark._ |
6. 아래 명령어로 count가 잘 되는지 확인한다. 잘 된다면 mongodb랑 잘 연동되었다는 의미.
|
val rdd = MongoSpark.load(sc)
|
7. sbt (spark-submit) 을 이용한 코드 예제는 아래와 같다.
실제 Spark 코드상에서 아래처럼 mongoDB 의 데이터를 가져올 수 있다.
|
import com.mongodb.spark._ import org.bson.Document import java.util.ArrayList import scala.collection.JavaConversions._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf
object IO_Mongo { def main(args: Array[String]) { val conf = new SparkConf().setAppName("mongo-spark integration test") val sc = new SparkContext(conf) val rdd = MongoSpark.load(sc) //여기서 rdd 는 com.mongodb.spark.rdd.MongoRDD[org.bson.Document] val df = rdd.toDF() df.show() } }
|
mongodb에서 읽고, mongodb에 쓰는 방법은 가장 위의 링크를 참고한다.
spark-mongo 에서 읽기, 쓰기 mongo uri 를 정하는 법
|
vi $SPARK_HOME/conf/spark-defaults.conf spark.mongodb.input.uri=mongodb://[mongo router host ip]:[mongo router host port]/[database name].[collection name] spark.mongodb.output.uri=mongodb://[mongo router host ip]:[mongo router host port]/[database name].[collection name] |
spark-mongo 에서 캐시를 얼마나 오랫동안 유지할 지 정하는 법
|
vi $SPARK_HOME/conf/spark-defaults.conf spark.mongodb.keep_alive_ms 0 0을 넣으면 0ms 동안 캐시를 유지한다. 기본은 5000ms. |
spark-mongo 에서 readPreference, partitioner 바꾸는 법
|
vi $SPARK_HOME/conf/spark-defaults.conf spark.mongodb.input.readPreference.name nearest spark.mongodb.input.partitioner MongoShardedPartitioner |
local threshold 바꾸는 법
|
vi $SPARK_HOME/conf/spark-defaults.conf spark.mongodb.input.localThreshold 15 기본은 15ms. |
참고할 만한 곳
https://docs.databricks.com/_static/notebooks/mongodb.html
https://www.slideshare.net/mongodb/how-to-connect-spark-to-your-own-datasource
'Spark' 카테고리의 다른 글
| [Spark] scala rdd 이용한 pagerank 알고리즘 이해하기 (0) | 2019.06.14 |
|---|---|
| [Spark] RDD 에 대한 구체적인 설명 링크 (0) | 2019.05.23 |
| [Spark] Ubuntu에 sbt 설치하는 법 (0) | 2019.05.21 |
| [Spark] There are 0 datanode(s) running and no node(s) are excluded in this operation. 에러 (0) | 2019.05.20 |
| [Spark] mongo lib 다운받는 곳 (0) | 2019.05.20 |