Hadoop 에서 지원하는 org.apache.hadoop.fs.FileSystem을 사용하면 된다.


읽을 때


try { Path pt = new Path("hdfs:data/path/is/here");// Location of file in HDFS FileSystem fs = FileSystem.get(new Configuration()); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt))); String line = null; while ((line = br.readLine()) != null) {     //do something here } } catch (Exception e) { }



위 처럼 읽으면 된다.


Path 에 hdfs:data/path/is/here 라고 되어있는 부분에


hdfs 에서 원하는 데이터의 path 를 넣으면 된다.


예를 들어보자.




아래 코드는 MR 을 두 번 하는데,


첫번째 MR 에서 숫자를 하나 만들고(key 별 value 의 개수를 구함) 그 숫자를 HDFS 에 저장.


두번째 MR 에서 위에서 센 숫자를 HDFS 에서 읽어서 사용(데이터의 뒤에 붙임)한다.  


input_data

0 1

0 2

2 1

2 0

2 3

3 2

1 3


input path in HDFS

hadoop fs -put input_data /data


path : /data


(즉, hdfs:/data 에 저장됨)


아래 코드로 Workload_IO.jar 파일을 만듦. (만드는 방법은 여기서 설명하지 않음)


import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.google.common.collect.Iterators;

public class Workload_IO {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = job1Setting(conf, args);
		job.waitForCompletion(true);

		try {
			Path pt = new Path("hdfs:" + args[0] + ".temp/part-r-00000");// Location of file in HDFS
			FileSystem fs = FileSystem.get(new Configuration());
			BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
			String line = null;
			while ((line = br.readLine()) != null) {
				conf.set("test", line);
			}
		} catch (Exception e) {
		}

		Job job2 = job2Setting(conf, args);
		job2.waitForCompletion(true);
	}

	public static class Map extends Mapper<Text, Text, Text, Text> {
		public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
			context.write(new Text("1"), value);
		}
	}

	public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
			int size = Iterators.size(values.iterator());
			context.write(NullWritable.get(), new Text(String.valueOf(size)));
		}
	}

	public static class Map2 extends Mapper<Text, Text, Text, Text> {
		public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
			String temp = key.toString() + " " + context.getConfiguration().get("test");
			context.write(new Text(temp), value);
		}
	}

	public static class Reduce2 extends Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
			for (Text value : values) {
				String fromHDFS = context.getConfiguration().get("test");
				if (fromHDFS == null)
					fromHDFS = "nothing";
				String temp = value.toString() + " " + fromHDFS;
				context.write(key, new Text(temp));
			}
		}
	}

	private static Job job1Setting(Configuration conf, String[] args) throws IOException {
		Job job = Job.getInstance(conf, "Worklaod_IO");
		job.setJarByClass(Workload_IO.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		job.setInputFormatClass(KeyValueTextInputFormat.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[0] + ".temp"));
		return job;
	}

	private static Job job2Setting(Configuration conf, String[] args) throws IOException {
		Job job = Job.getInstance(conf, "Worklaod_IO2");
		job.setJarByClass(Workload_IO.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setMapperClass(Map2.class);
		job.setReducerClass(Reduce2.class);
		job.setInputFormatClass(KeyValueTextInputFormat.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[0] + ".out"));
		return job;
	}
}




아래 명령어로 MR 을 돌림

hadoop jar Workload_IO.jar Workload_IO /data 



결과


0 7     2 7

0 7     1 7

1 7     3 7

2 7     3 7

2 7     0 7

2 7     1 7

3 7     2 7 


HDFS 상태

hadoop fs -ls /


-rw-r--r--   3 root supergroup         28 2019-02-15 03:54 /data

drwxr-xr-x   - root supergroup          0 2019-02-15 07:25 /data.out

drwxr-xr-x   - root supergroup          0 2019-02-15 07:24 /data.temp 



/data : 원본 데이터.

/data.temp : 첫번째 MR 이 쓴 데이터.

/data.out : 두번째 MR 이 쓴 데이터. 각 key value 에 7(/data.temp/part-r-00000 값) 이 붙어있다.





위의 코드는 사용 예제이므로 커스터마이징 해서 사용하면 된다.




참고.

https://stackoverflow.com/questions/26209773/hadoop-map-reduce-read-a-text-file/26210291




+ Recent posts