Hadoop MapReduce 를 실행할 때 데이터를 MongoDB 에서 가져올 수 있다.


자세한 사항은 https://eyeballs.tistory.com/6 참고.




MongoDB 를 사용하는 MapReduce 에서 Mongo 로 쿼리를 보내는 방법을 알아보자.


몽고의 dataset.data 에서 데이터를 가져오고,


data.out 에 데이터를 저장한다.


dataset.data 에 저장된 데이터

{ "_id" : 1, "value" : "D6DtsdEtDF" }

{ "_id" : 2, "value" : "jsJ2432NEH" }

{ "_id" : 3, "value" : "E0nqwqILXR" }

{ "_id" : 4, "value" : "lwLq7LD9wx" }

{ "_id" : 5, "value" : "pMpHXyjEp8" }

{ "_id" : 6, "value" : "x5XpW6vHFO" }

{ "_id" : 7, "value" : "BoLP2JQHjP" }

{ "_id" : 8, "value" : "GsFvaHgU9y" }

{ "_id" : 9, "value" : "WADyvzq2B9" }

{ "_id" : 10, "value" : "5pRSfw6ITN" }

{ "_id" : 11, "value" : "jxuGftDrVX" }

{ "_id" : 12, "value" : "aa299zMERE" }

{ "_id" : 13, "value" : "aQaHF6oRWh" }

{ "_id" : 14, "value" : "OvUaEjvOMZ" }

{ "_id" : 15, "value" : "mptsnyLoUy" }

{ "_id" : 16, "value" : "JGGfq12C2J" }

{ "_id" : 17, "value" : "Rl5wpMW5xW" }

{ "_id" : 18, "value" : "FEHurGtmlO" }

{ "_id" : 19, "value" : "N9Jd1aZ5nx" }

{ "_id" : 20, "value" : "LHwboHV0KL" } 



아래 코드를 기반으로 쿼리 보내는 방법을 설명한다.


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.bson.BasicBSONObject;

import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.util.MongoConfigUtil;

public class Workload_IO {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		MongoConfigUtil.setInputURI(conf, "mongodb://dataset.data");
		MongoConfigUtil.setOutputURI(conf, "mongodb://dataset.out");

		// make 'query' object here

		MongoConfigUtil.setQuery(conf, query);

		Job job = Job.getInstance(conf, "Worklaod_IO");
		job.setJarByClass(Workload_IO.class);

		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);

		job.setOutputKeyClass(Integer.class);
		job.setOutputValueClass(BSONWritable.class);

		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);

		job.setInputFormatClass(MongoInputFormat.class);
		job.setOutputFormatClass(MongoOutputFormat.class);

		job.waitForCompletion(true);

	}

	public static class Map extends Mapper<Integer, BasicDBObject, IntWritable, Text> {
		public void map(Integer key, BasicDBObject value, Context context) throws IOException, InterruptedException {
			context.write(new IntWritable(key), new Text(value.getString("value")));
		}
	}

	public static class Reduce extends Reducer<IntWritable, Text, Integer, BSONWritable> {
		public void reduce(IntWritable key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {

			BasicBSONObject output = new BasicBSONObject();
			BSONWritable reduceResult = new BSONWritable();

			Text val = values.iterator().next();
			if (val != null) {
				output.put("value", val.toString());
				reduceResult.setDoc(output);
				context.write(key.get(), reduceResult);
			}
		}
	}
}


MongoConfigUtil.setQuery 의 

첫번째 인자는 Configuration object

두번째 인자는 String 혹은 DBObject 가 들어간다.


위에 // make 'query' object here 라고 되어 있는 곳에 쿼리 오브젝트를 만든다.





1. String 객체를 만들어서 쿼리를 보내는 방법

String query = "{_id:3}";


String query = "{_id:{$lt:5}}";



2. BasicDBObject 를 만들어서 보내는 방법

DBObject query = new BasicDBObject("_id", 3);


DBObject query = new BasicDBObject("_id", new BasicDBObject("$lt", 3))
	.append("_id", new BasicDBObject("$gt", 18)); //이렇게 하면 앞에 3 은 무시되고 뒤에 것이 적용됨.


DBObject query = new BasicDBObject("$or",
	new BasicDBObject[] { new BasicDBObject("_id", new BasicDBObject("$lt", 2)),
	new BasicDBObject("_id", new BasicDBObject("$gte", 7)) });



3. QueryBuilder 를 만들어서 보내는 방법

DBObject query = QueryBuilder.start("_id").lessThan(5).get();



여러 방법이 있다.









Hadoop 에서 지원하는 org.apache.hadoop.fs.FileSystem을 사용하면 된다.


읽을 때


try { Path pt = new Path("hdfs:data/path/is/here");// Location of file in HDFS FileSystem fs = FileSystem.get(new Configuration()); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt))); String line = null; while ((line = br.readLine()) != null) {     //do something here } } catch (Exception e) { }



위 처럼 읽으면 된다.


Path 에 hdfs:data/path/is/here 라고 되어있는 부분에


hdfs 에서 원하는 데이터의 path 를 넣으면 된다.


예를 들어보자.




아래 코드는 MR 을 두 번 하는데,


첫번째 MR 에서 숫자를 하나 만들고(key 별 value 의 개수를 구함) 그 숫자를 HDFS 에 저장.


두번째 MR 에서 위에서 센 숫자를 HDFS 에서 읽어서 사용(데이터의 뒤에 붙임)한다.  


input_data

0 1

0 2

2 1

2 0

2 3

3 2

1 3


input path in HDFS

hadoop fs -put input_data /data


path : /data


(즉, hdfs:/data 에 저장됨)


아래 코드로 Workload_IO.jar 파일을 만듦. (만드는 방법은 여기서 설명하지 않음)


import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.google.common.collect.Iterators;

public class Workload_IO {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = job1Setting(conf, args);
		job.waitForCompletion(true);

		try {
			Path pt = new Path("hdfs:" + args[0] + ".temp/part-r-00000");// Location of file in HDFS
			FileSystem fs = FileSystem.get(new Configuration());
			BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
			String line = null;
			while ((line = br.readLine()) != null) {
				conf.set("test", line);
			}
		} catch (Exception e) {
		}

		Job job2 = job2Setting(conf, args);
		job2.waitForCompletion(true);
	}

	public static class Map extends Mapper<Text, Text, Text, Text> {
		public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
			context.write(new Text("1"), value);
		}
	}

	public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
			int size = Iterators.size(values.iterator());
			context.write(NullWritable.get(), new Text(String.valueOf(size)));
		}
	}

	public static class Map2 extends Mapper<Text, Text, Text, Text> {
		public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
			String temp = key.toString() + " " + context.getConfiguration().get("test");
			context.write(new Text(temp), value);
		}
	}

	public static class Reduce2 extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
			for (Text value : values) {
				String fromHDFS = context.getConfiguration().get("test");
				if (fromHDFS == null)
					fromHDFS = "nothing";
				String temp = value.toString() + " " + fromHDFS;
				context.write(key, new Text(temp));
			}
		}
	}

	private static Job job1Setting(Configuration conf, String[] args) throws IOException {
		Job job = Job.getInstance(conf, "Worklaod_IO");
		job.setJarByClass(Workload_IO.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		job.setInputFormatClass(KeyValueTextInputFormat.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[0] + ".temp"));
		return job;
	}

	private static Job job2Setting(Configuration conf, String[] args) throws IOException {
		Job job = Job.getInstance(conf, "Worklaod_IO2");
		job.setJarByClass(Workload_IO.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setMapperClass(Map2.class);
		job.setReducerClass(Reduce2.class);
		job.setInputFormatClass(KeyValueTextInputFormat.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[0] + ".out"));
		return job;
	}
}




아래 명령어로 MR 을 돌림

hadoop jar Workload_IO.jar Workload_IO /data 



결과


0 7     2 7

0 7     1 7

1 7     3 7

2 7     3 7

2 7     0 7

2 7     1 7

3 7     2 7 


HDFS 상태

hadoop fs -ls /


-rw-r--r--   3 root supergroup         28 2019-02-15 03:54 /data

drwxr-xr-x   - root supergroup          0 2019-02-15 07:25 /data.out

drwxr-xr-x   - root supergroup          0 2019-02-15 07:24 /data.temp 



/data : 원본 데이터.

/data.temp : 첫번째 MR 이 쓴 데이터.

/data.out : 두번째 MR 이 쓴 데이터. 각 key value 에 7(/data.temp/part-r-00000 값) 이 붙어있다.





위의 코드는 사용 예제이므로 커스터마이징 해서 사용하면 된다.




참고.

https://stackoverflow.com/questions/26209773/hadoop-map-reduce-read-a-text-file/26210291







katacoda 라는 사이트가 있는데, 여기서 Docker, Git, Kubernat



https://www.katacoda.com/





위와 같은 기술들을 배울 수 있다고 함.


웹에서 직접 실습하면서!


대박 :)


즐겁게 공부합시다 ㅎㅎ




+ Recent posts