1 实验目的
了解MapReduce中“Map”和“Reduce”基本概念和主要思想;掌握基本的MapReduce API编程,并实现合并、去重、排序等基本功能;
2 实验环境
实验平台:基于实验一搭建的虚拟机Hadoop大数据实验平台上的MapReduce集群;
编程语言:JAVA
3 实验内容
- 编程实现文件合并和去重操作;对于每行至少具有三个字段的两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。
- 编写程序实现对输入文件的排序;现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。
- 对以上两个任务撰写实验报告,并提交相关实验代码。
4 任务一
编程实现文件合并和去重操作;对于每行至少具有三个字段的两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。
4.1 编写代码Merge.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class Merge {
// Map类,继承自Mapper类--一个抽象类
public static class Map extends Mapper<Object, Text, Text, Text> {
private static Text text = new Text();
// 重写map方法
public void map(Object key, Text value, Context content) throws IOException, InterruptedException {
text = value;
// 底层通过Context content传递信息(即key value)
content.write(text, new Text(""));
}
}
// Reduce类,继承自Reducer类--一个抽象类
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 对于所有的相同的key,只写入一个,相当于对于所有Iterable<Text> values,只执行一次write操作
context.write(key, new Text(""));
}
}
// main方法
public static void main(String[] args) throws Exception {
final String INPUT_PATH = "ghb_lab3_input";// 输入目录
final String OUTPUT_PATH = "ghb_lab3_output";// 输出目录
Configuration conf = new Configuration();
// conf.set("fs.defaultFS", "hdfs://localhost:9000");
Path path = new Path(OUTPUT_PATH);
// 加载配置文件
FileSystem fileSystem = path.getFileSystem(conf);
// 输出目录若存在则删除
if (fileSystem.exists(new Path(OUTPUT_PATH))) {
fileSystem.delete(new Path(OUTPUT_PATH), true);
}
Job job = Job.getInstance(conf, "Merge");
job.setJarByClass(Merge.class);
job.setMapperClass(Map.class); // 初始化为自定义Map类
job.setReducerClass(Reduce.class); // 初始化为自定义Reduce类
job.setOutputKeyClass(Text.class); // 指定输出的key的类型,Text相当于String类
job.setOutputValueClass(Text.class); // 指定输出的Value的类型,Text相当于String类
FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); // FileInputFormat指将输入的文件(若大于64M)进行切片划分,每个split切片对应一个Mapper任务
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4.2 传输代码和实验数据
打开WinSCP,将Merge.java及实验数据复制到虚拟机cluster1的/home/hadoop路径下。
在cluster1中使用用户hadoop,输入以下命令
cd ~
// 创建输入文件夹
mkdir ghb_lab3_input
// 创建输出文件夹
mkdir ghb_lab3_output
// 将数据文件移动到输入文件夹内
mv 附录实验数据一.csv ghb_lab3_output/
mv 附录实验数据二.csv ghb_lab3_output/
// 查看文件是否移动成功
ls ghb_lab3_output
4.3 编译运行
// 构造新的命令ghb_javac,注意下面是一行,不要写成多行
alias ghb_javac="javac -cp /usr/local/hadoop-2.6.5/share/hadoop/common/*:/usr/local/hadoop-2.6.5/share/hadoop/common/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/hdfs/*:/usr/local/hadoop-2.6.5/share/hadoop/mapreduce/*:/usr/local/hadoop-2.6.5/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/yarn/*:"
// 构造新的命令ghb_java,注意下面是一行,不要写成多行
alias ghb_java="java -cp /usr/local/hadoop-2.6.5/share/hadoop/common/*:/usr/local/hadoop-2.6.5/share/hadoop/common/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/hdfs/*:/usr/local/hadoop-2.6.5/share/hadoop/mapreduce/*:/usr/local/hadoop-2.6.5/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/yarn/*:"
// 编译
ghb_javac Merge.java
// 运行
ghb_java Merge
4.4 查看输出结果
ls ghb_lab3_output/
cat ghb_lab3_output/part-r-00000
5 任务二
编写程序实现对输入文件的排序;现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。
5.1 编写代码Sort.java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.fs.FileSystem;
public class Sort {
public static class Partition extends Partitioner<IntWritable, IntWritable> {
@Override
public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
int MaxNumber = 65223;
int bound = MaxNumber / numPartitions + 1;
int keynumber = key.get();
for (int i = 0; i < numPartitions; i++) {
if (keynumber < bound * i && keynumber >= bound * (i - 1))
return i - 1;
}
return 0;
}
}
public static class SortMapper extends Mapper<Object, Text, IntWritable, IntWritable> {
private static IntWritable data = new IntWritable();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
data.set(Integer.parseInt(line));
context.write(data, new IntWritable(1));
}
}
public static class SortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private static IntWritable linenum = new IntWritable(1);
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
for (IntWritable val : values) {
context.write(linenum, key);
linenum = new IntWritable(linenum.get() + 1);
}
}
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
final String INPUT_PATH = "ghb_lab3_input2";// 输入目录
final String OUTPUT_PATH = "ghb_lab3_output2";// 输出目录
Configuration conf = new Configuration();
// conf.set("fs.defaultFS", "hdfs://localhost:9000");
Path path = new Path(OUTPUT_PATH);
// 加载配置文件
FileSystem fileSystem = path.getFileSystem(conf);
// 输出目录若存在则删除
if (fileSystem.exists(new Path(OUTPUT_PATH))) {
fileSystem.delete(new Path(OUTPUT_PATH), true);
}
Job job = new Job(conf, "Sort");
job.setJarByClass(Sort.class);
job.setMapperClass(SortMapper.class);
job.setPartitionerClass(Partition.class);
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); // FileInputFormat指将输入的文件(若大于64M)进行切片划分,每个split切片对应一个Mapper任务
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
5.2 传输代码和实验数据
打开WinSCP,将Sort.java及实验数据复制到虚拟机cluster1的/home/hadoop路径下。
在cluster1中使用用户hadoop,输入以下命令
cd ~
// 创建输入文件夹
mkdir ghb_lab3_input2
// 创建输出文件夹
mkdir ghb_lab3_output2
// 将数据文件移动到输入文件夹内
mv 附录实验数据三.txt ghb_lab3_input2/
mv 附录实验数据四.txt ghb_lab3_input2/
// 查看文件是否移动成功
ls ghb_lab3_input2
5.3 编译运行
// 编译
ghb_javac Sort.java
// 运行
ghb_java Sort
5.4 查看输出结果
ls ghb_lab3_output2/
cat ghb_lab3_output2/part-r-00000