MapReduce编程入门及实战

| 文章字数:2.1k | 阅读时长:12min

MapReduce原理

MapReduce WordCount处理流程

MapReduce WordCount处理流程

开始准备

编程准备

IntelliJ IDEA软件Hadoop的Windows版本Maven

IDEA基本配置

需要配置maven国内源,之前的文章已经说过如何配置,这里就不再赘述。需要修改新建项目中的pom.xml文件,修改成如下所示。要修改的地方主要是<dependencies>中的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>hadoop</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>
</project>

词频统计

准备数据wordcount.txt

1
2
3
Dear Bear River
Car Car River
Dear Car Bear

上传数据

通过HDFS Shell将实验数据上传到Hadoop平台

1
hdfs dfs -put wordcount.txt /T01

Java代码

WordCount.java代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

public class WordCount {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
}

上面代码中的FileInputFormat.addInputPathFileOutputFormat.setOutputPath也可以改成自己Windows电脑本地路径,可以在本地进行测试。若要上传Hadoop平台的话需要生成一个jar包

测试运行

1
2
hadoop jar WordCount.jar w.b.l.WordCount /T01/wordcount.txt /out/wordcount
hdfs dfs -cat /out/wordcount/part-r-00000

结果如下所示

WordCount结果

校园社区网站访问次数统计

数据WebSite.txt

1
2
3
2016001,20190319
2016002,20190319
2016001,20190319

上传数据

1
hdfs dfs -put WebSite.txt /T01

Java代码

WebsiteVistors.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import java.io.IOException;

public class WebsiteVistors {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//配置
Configuration conf = new Configuration();
//新建job
Job job = Job.getInstance(conf,"websitevistors");
job.setJarByClass(WebsiteVistors.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//配置map reduce输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//输入文件
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));


System.exit(job.waitForCompletion(true)?0:1);

}
}

MyReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable result = new IntWritable() ;

@Override

protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {
int sum= 0;
for (IntWritable value : values){
sum += value.get();
}
result.set(sum);
context. write (key, result);
}
}

MyMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.Text;
import java.io.IOException;

/**
* 2016001,20190319
* 2016002,20190319
* 2016001,20190319
*/
//(2016001,1)

public class MyMapper extends Mapper<Object,Text,Text,IntWritable> {

@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(",");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));

}
}
}

测试运行

1
2
hadoop jar website.jar WBL.MapReduce.WebsiteVistors /T01/WebSite.txt /out/website
hdfs dfs -cat /out/website/part-r-00000

校园社区网站访问次数统计

校园社区网站访问次数排序

数据sort.txt

1
2
3
4
5
20160101,100
20160102,200
20160103,151
20160120,120
20160121,1000

上传数据

通过HDFS Shell将实验数据上传到HDFS。

1
hdfs dfs -put sort.txt /T01

Java代码

VistSorted.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



import java.io.IOException;

import static org.apache.hadoop.mapreduce.Job.getInstance;

public class VistSorted {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf=new Configuration() ;
Job job = getInstance(conf,"VistSorted");
job.setJarByClass(VistSorted.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//map reduce输出
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);


//输入文件
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));


System.exit(job.waitForCompletion(true)?0:1);
}
}

MyReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

class MyReducer extends Reducer<IntWritable, Text,Text,IntWritable> {
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}

}

MyMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
20160101,100 [20160101,100]排序
20160102,200
20160103,151
20160120,120
20160121,1000
*/


public class MyMapper extends Mapper <Object, Text, IntWritable, Text> {


@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String lines = value.toString();
String[] words = lines.split(",");
int keyOut = Integer.parseInt(words[1]);
String valueOut = words[0];
context.write(new IntWritable(keyOut), new Text(valueOut));
}
}

测试运行

1
2
hadoop jar Sort.jar WBL.SORT.VistSorted /T01/sort.txt /out/sort
hdfs dfs -cat /out/sort/part-r-00000

校园社区网站访问次数排序

获取成绩表的最高分记录

数据score.txt

1
2
3
4
5
6
语文,95
数学,98
英语,99
英语,88
语文,78
数学,100

上传数据

score.txt

1
hdfs dfs -put score.txt /T01

Java代码

HighScore.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* @author WBL
* @Date 2020/4/15
**/
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;

public class HighScore {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf= new Configuration();
Job job=Job.getInstance(conf,"HighScore");
job.setJarByClass(HighScore.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//文件输入输出
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));

System.exit(job.waitForCompletion(true)?0:1);
}
}

MyReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @author WBL
* @Date 2020/4/15
**/
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;


public class MyReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
private IntWritable result=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int high=0;
for (IntWritable value:values){
if(value.get()>high){
high=value.get();
}
}
result.set(high);
context.write(key,result);
}
}

MyMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* @author WBL
* @Date 2020/4/15
**/


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.io.Text;
import java.io.IOException;

/**数据
* yuwen,89
* yuwen,98
* shuxue,97
* english,100
* shuxue,98
*/
public class MyMapper extends Mapper<Object, Text,Text, IntWritable> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] words =line.split(",");
context.write(new Text(words[0]),new IntWritable(Integer.parseInt(words[1])));
}
}

测试运行

1
2
hadoop jar HighScore.jar WBL.SCORE.HighScore /T01/score.txt /out/score
hdfs dfs -cat /out/score/part-r-00000

获取成绩表的最高分记录

对两个文件中的数据进行合并去重

数据XX.txtYY.txt放在combiner目录下内容分别是

XX.txt

1
2
3
20160102
20160103
20160105

YY.txt

1
2
3
20160101
20160102
20160106

上传数据

1
hdfs dfs -put combiner/ /T01

Java代码

Combiner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import java.io.IOException;

public class Combiner {
public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration() ;
Job job = Job. getInstance(conf,"Combiner" );
job. setJarByClass (Combiner.class) ;
job. setMapperClass (MyMapper.class) ;
job. setReducerClass (MyReducer.class) ;
job. setMapOutputKeyClass(Text. class);
job. setMapOutputValueClass (IntWritable. class);
job. setOutputKeyClass (Text. class) ;
job. setOutputValueClass (NullWritable. class);
FileInputFormat.addInputPath(job,new Path(args[0]));

FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}

MyMapper.java

1
2
3
4
5
6
7
8
9
10
11
12
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
@Override
protected void map (Object key, Text value,Context context) throws IOException,InterruptedException {
context. write (value, new IntWritable( 1));
}
}

MyReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReducer extends Reducer<Text, IntWritable, Text, NullWritable> {
@Override
protected void reduce (Text key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
context. write (key, null) ;
}
}

测试运行

1
2
hadoop jar ComBiner.jar WBL.ComBiner.Combiner /T01/combiner /out/combiner
hdfs dfs -cat /out/combiner/part-r-00000

对两个文件中的数据进行合并去重

扫码加我微信