MapReduce原理
MapReduce WordCount处理流程
开始准备 编程准备 IntelliJ IDEA软件 、Hadoop的Windows版本 、Maven
IDEA基本配置 需要配置maven国内源,之前的文章已经说过如何配置,这里就不再赘述。需要修改新建项目中的pom.xml
文件,修改成如下所示。要修改的地方主要是<dependencies>
中的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > org.example</groupId > <artifactId > hadoop</artifactId > <version > 1.0-SNAPSHOT</version > <dependencies > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-client</artifactId > <version > 2.8.5</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-common</artifactId > <version > 2.8.5</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-hdfs</artifactId > <version > 2.8.5</version > </dependency > </dependencies > </project >
词频统计 准备数据wordcount.txt
1 2 3 Dear Bear River Car Car River Dear Car Bear
上传数据 通过HDFS Shell将实验数据上传到Hadoop平台
1 hdfs dfs -put wordcount.txt /T01
Java代码 WordCount.java
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.util.StringTokenizer;public class WordCount { public static void main (String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count" ); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0 ])); FileOutputFormat.setOutputPath(job, new Path(args[1 ])); System.exit(job.waitForCompletion(true ) ? 0 : 1 ); } public static class TokenizerMapper extends Mapper <Object , Text , Text , IntWritable > { private final static IntWritable one = new IntWritable(1 ); private Text word = new Text(); public void map (Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer <Text ,IntWritable ,Text ,IntWritable > { private IntWritable result = new IntWritable(); public void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0 ; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } }
上面代码中的FileInputFormat.addInputPath
和FileOutputFormat.setOutputPath
也可以改成自己Windows电脑本地路径,可以在本地进行测试。若要上传Hadoop平台的话需要生成一个jar包
测试运行 1 2 hadoop jar WordCount.jar w.b.l.WordCount /T01/wordcount.txt /out/wordcount hdfs dfs -cat /out/wordcount/part-r-00000
结果如下所示
校园社区网站访问次数统计 数据WebSite.txt
1 2 3 2016001,20190319 2016002,20190319 2016001,20190319
上传数据 1 hdfs dfs -put WebSite.txt /T01
Java代码 WebsiteVistors.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WebsiteVistors { public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"websitevistors" ); job.setJarByClass(WebsiteVistors.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,new Path(args[0 ])); FileOutputFormat.setOutputPath(job,new Path(args[1 ])); System.exit(job.waitForCompletion(true )?0 :1 ); } }
MyReducer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.io.Text;import java.io.IOException;public class MyReducer extends Reducer <Text , IntWritable , Text , IntWritable > { private IntWritable result = new IntWritable() ; @Override protected void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException { int sum= 0 ; for (IntWritable value : values){ sum += value.get(); } result.set(sum); context. write (key, result); } }
MyMapper.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.io.Text;import java.io.IOException;public class MyMapper extends Mapper <Object ,Text ,Text ,IntWritable > { @Override protected void map (Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split("," ); for (String word : words) { context.write(new Text(word), new IntWritable(1 )); } } }
测试运行 1 2 hadoop jar website.jar WBL.MapReduce.WebsiteVistors /T01/WebSite.txt /out/website hdfs dfs -cat /out/website/part-r-00000
校园社区网站访问次数排序 数据sort.txt
1 2 3 4 5 20160101,100 20160102,200 20160103,151 20160120,120 20160121,1000
上传数据 通过HDFS Shell将实验数据上传到HDFS。
1 hdfs dfs -put sort.txt /T01
Java代码 VistSorted.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import static org.apache.hadoop.mapreduce.Job.getInstance;public class VistSorted { public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration() ; Job job = getInstance(conf,"VistSorted" ); job.setJarByClass(VistSorted.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job,new Path(args[0 ])); FileOutputFormat.setOutputPath(job,new Path(args[1 ])); System.exit(job.waitForCompletion(true )?0 :1 ); } }
MyReducer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;class MyReducer extends Reducer <IntWritable , Text ,Text ,IntWritable > { @Override protected void reduce (IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(value, key); } } }
MyMapper.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MyMapper extends Mapper <Object , Text , IntWritable , Text > { @Override protected void map (Object key, Text value, Context context) throws IOException, InterruptedException { String lines = value.toString(); String[] words = lines.split("," ); int keyOut = Integer.parseInt(words[1 ]); String valueOut = words[0 ]; context.write(new IntWritable(keyOut), new Text(valueOut)); } }
测试运行 1 2 hadoop jar Sort.jar WBL.SORT.VistSorted /T01/sort.txt /out/sort hdfs dfs -cat /out/sort/part-r-00000
获取成绩表的最高分记录 数据score.txt
1 2 3 4 5 6 语文,95 数学,98 英语,99 英语,88 语文,78 数学,100
上传数据 score.txt
1 hdfs dfs -put score.txt /T01
Java代码 HighScore.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.conf.Configuration;import java.io.IOException;public class HighScore { public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf= new Configuration(); Job job=Job.getInstance(conf,"HighScore" ); job.setJarByClass(HighScore.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,new Path(args[0 ])); FileOutputFormat.setOutputPath(job,new Path(args[1 ])); System.exit(job.waitForCompletion(true )?0 :1 ); } }
MyReducer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MyReducer extends Reducer <Text , IntWritable ,Text , IntWritable > { private IntWritable result=new IntWritable(); @Override protected void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int high=0 ; for (IntWritable value:values){ if (value.get()>high){ high=value.get(); } } result.set(high); context.write(key,result); } }
MyMapper.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.io.Text;import java.io.IOException;public class MyMapper extends Mapper <Object , Text ,Text , IntWritable > { @Override protected void map (Object key, Text value, Context context) throws IOException, InterruptedException { String line=value.toString(); String[] words =line.split("," ); context.write(new Text(words[0 ]),new IntWritable(Integer.parseInt(words[1 ]))); } }
测试运行 1 2 hadoop jar HighScore.jar WBL.SCORE.HighScore /T01/score.txt /out/score hdfs dfs -cat /out/score/part-r-00000
对两个文件中的数据进行合并去重 数据XX.txt
和YY.txt
放在combiner目录下内容分别是
XX.txt
1 2 3 20160102 20160103 20160105
YY.txt
1 2 3 20160101 20160102 20160106
上传数据 1 hdfs dfs -put combiner/ /T01
Java代码 Combiner.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Combiner { public static void main (String[] args) throws IOException,ClassNotFoundException, InterruptedException { Configuration conf = new Configuration() ; Job job = Job. getInstance(conf,"Combiner" ); job. setJarByClass (Combiner.class) ; job. setMapperClass (MyMapper.class) ; job. setReducerClass (MyReducer.class) ; job. setMapOutputKeyClass(Text. class ) ; job. setMapOutputValueClass (IntWritable. class ) ; job. setOutputKeyClass (Text. class ) ; job. setOutputValueClass (NullWritable. class ) ; FileInputFormat.addInputPath(job,new Path(args[0 ])); FileOutputFormat.setOutputPath(job,new Path(args[1 ])); System.exit(job.waitForCompletion(true )?0 :1 ); } }
MyMapper.java
1 2 3 4 5 6 7 8 9 10 11 12 import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MyMapper extends Mapper <Object , Text , Text , IntWritable > { @Override protected void map (Object key, Text value,Context context) throws IOException,InterruptedException { context. write (value, new IntWritable( 1 )); } }
MyReducer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MyReducer extends Reducer <Text , IntWritable , Text , NullWritable > { @Override protected void reduce (Text key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context. write (key, null ) ; } }
测试运行 1 2 hadoop jar ComBiner.jar WBL.ComBiner.Combiner /T01/combiner /out/combiner hdfs dfs -cat /out/combiner/part-r-00000