MongoDB 에서 읽은 데이터로 Hadoop Map-Reduce 를 돌리고, 결과를 다시 MongoDB 에 쓰는 것을 해보자.
MongoDB와 Hadoop 을 연결시켜주는 " mongo-hadoop connector " 를 사용한다.
https://github.com/mongodb/mongo-hadoop
아래 page 를 참고해서
core jar 파일과 mongo java driver 를 받아서, hadoop 의 class path 내에 넣는다.
https://github.com/mongodb/mongo-hadoop/wiki/MapReduce-Usage#installation
mongo-java-driver 와 mongo-hadoop-core 는 아래 페이지에서 받는다.
https://mvnrepository.com/artifact/org.mongodb
그러면 hadoop Map-Reduce 에서 해당 lib를 사용하여 MongoDB 에 접근이 가능하다.
아래 코드에서 사용할 데이터는 아래와 같다.
HDFS 일 때
asdgegwegwegw wefewgrgevefrre wegwfkwenfdmi dfdsggwehwefee ..... |
MongoDB 에 저장되어 있을 때
_id data 1 asdgegwegwegw 2 wefewgrgevefrre 3 wegwfkwenfdmi 4 dfdsggwehwefee ..... |
먼저 아주 간단한 Hadoop Map-Reduce Code 를 보자.
이 코드는 읽은 데이터를 그대로 출력하는 코드이다.
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Workload_IO {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Worklaod_IO");
job.setJarByClass(Workload_IO.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class Map extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
context.write(new Text(key.toString()), value);
}
}
public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Text val = values.iterator().next();
if (val != null) {
context.write(NullWritable.get(), val);
}
}
}
}
|
Map 쪽의 key 값은 Hadoop 이 알아서 만든 값이기 때문에 Reduce 로 모으는 용도로만 쓰고, output 에서는 쓰지 않는다.
output 의 key 값이 NullWritable 이다. 이 말인 즉슨, key 값에는 아무것도 쓰지 않는다는 말이다.
만약 NullWritable 이 아니라 String key = ""; 같은 것을 넣는다면 맨 앞에 \t 이 들어가게 된다.
아무것도 쓰고 싶지 않고 싶을 때 NullWritable.get() 을 사용하면 된다.
HDFS 에서 읽고 쓰는 경우
< Map Input > key 타입 : Object value 타입 : Text
<Map Output> key 타입 : Text value 타입 : Text
<Reduce Input> key 타입 : Text value 타입 : Text
<Reduce Output> key 타입 : NullWritable value 타입 : Text |
똑같은 일을 하지만 대신 HDFS 에서 데이터를 읽고 쓰는 게 아니라 MongoDB 에서 읽고 MongoDB에 쓰는 코드는 아래와 같다.
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.bson.BasicBSONObject;
import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.MongoConfigUtil;
public class Workload_IO {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
MongoConfigUtil.setInputURI(conf, "mongodb://" + args[0]);
MongoConfigUtil.setOutputURI(conf, "mongodb://" + args[1]);
Job job = Job.getInstance(conf, "Worklaod_IO");
job.setJarByClass(Workload_IO.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Integer.class);
job.setOutputValueClass(BSONWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(MongoInputFormat.class);
job.setOutputFormatClass(MongoOutputFormat.class);
job.waitForCompletion(true);
}
public static class Map extends Mapper<Integer, BasicDBObject, IntWritable, Text> {
public void map(Integer key, BasicDBObject value, Context context) throws IOException, InterruptedException {
context.write(new IntWritable(key), new Text(value.getString("data")));
}
}
public static class Reduce extends Reducer<IntWritable, Text, Integer, BSONWritable> {
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
BasicBSONObject output = new BasicBSONObject();
BSONWritable reduceResult = new BSONWritable();
Text val = values.iterator().next();
if (val != null) {
output.put("value", val.toString());
reduceResult.setDoc(output);
context.write(key.get(), reduceResult);
}
}
}
}
|
다른 점만 bold 처리 해보았다.
하나 하나 살펴보자.
1.
MongoConfigUtil.setInputURI(conf, "mongodb://" + args[0]);
MongoConfigUtil.setOutputURI(conf, "mongodb://" + args[1]);
|
args[0]에는 MongoDB 로부터 데이터를 읽어들일 input path
args[1] MongoDB 에 데이터를 쓸 output path 가 들어간다.
둘 다 표시 형식은 [mongodb의 router ip address]/[mongodb database name].[mongodb collection name] 이 들어간다.
예를 들어
MongoConfigUtil.setInputURI(conf, "mongodb://192.168.0.3/dataset.data"); MongoConfigUtil.setOutputURI(conf, "mongodb://192.168.0.3/dataset.out"); |
mongo router 는 192.168.0.3 의 ip 주소를 갖고 있다.
dataset 이라는 데이터베이스 내에 data 라는 collection 가 있고 여기서 데이터들(documents)를 읽어온다.
dataset 이라는 데이터베이스 내에 out 이라는 collection 에 결과값들을 쓴다.
_id 값이 Map 의 key 값으로 들어가고 그 외의 값들이 Map 의 value 값으로 들어간다.
2.
job.setOutputValueClass(BSONWritable.class);
|
Reduce 의 output value 형식은 BSONWritable.class 여야 한다.
MongoDB 에서 BSON 형식으로 데이터가 이동하기 때문이다.
3.
job.setInputFormatClass(MongoInputFormat.class);
job.setOutputFormatClass(MongoOutputFormat.class);
|
IO 의 포맷은 MongoInputFormat.class, MongoOutputFormat.class 여야 한다.
그래야 MongoDB 에서 읽고 쓸 수 있다.
4.
context.write(new IntWritable(key), new Text(value.getString("data")));
|
Map 에서의 key 값은 MongoDB 의 _id 값이 된다.
BasicDBObject 로 날아온 MongoDB 의 데이터에서
getString, getInteger .. 등의 메소드를 통해 값을 읽어올 수 있다.
가령
{ _id : 2,
name : "eyeballs",
age : 20 }
인 데이터를 저장하고 있는 mongoDB 에서 데이터를 읽었을 때
key 는 2 가 된다.
value.getString("name") 의 return 값은 "eyeballs" 가 된다.
value.getInteger("age") 의 return 값은 20 이 된다.
5.
BasicBSONObject output = new BasicBSONObject();
BSONWritable reduceResult = new BSONWritable();
Text val = values.iterator().next();
if (val != null) {
output.put("value", val.toString());
reduceResult.setDoc(output);
context.write(key.get(), reduceResult);
}
|
BSONWritable 형식으로 context 에 write 하는 방법이다.
output.put 의 첫번째 인자로, MongoDB 에 저장할 데이터 값의 "key" 가 들어가고,
두번째 인자로 저장할 데이터가 들어간다.
MongoDB 에서 읽고 쓰는 경우
< Map Input > key 타입 : Integer (_id 가 1,2,3... 값이었기 때문에) value 타입 : BasicDBObject (MongoDB 에서 읽어올 때 이런 형식으로 )
<Map Output> key 타입 : IntWritable value 타입 : Text
<Reduce Input> key 타입 : IntWritable value 타입 : Text
<Reduce Output> key 타입 : Integer value 타입 : BSONWritable |
쓴 후에 해당 데이터베이스 내의 컬렉션으로 가서 저장이 잘 되었는지 확인하자.
좀 더 많은 비교 코드를 보고 싶다면 아래 git 을 참고하자.
https://github.com/eyeballss/mongo-hadoop