MapReduce编程实践

MapReduce编程实践编程环境准备 要在 Eclipse 上编译和运行 MapReduce 程序 需要安装 hadoop eclipse plugin 可下载 Github 上的 hadoop2x eclipse plugin 下载后 将 release 中的 hadoop eclipse kepler plugin 2 6 0 jar 复制到 Eclipse 安装目录的 plugins 文件夹中 运行 ecl

编程环境准备:

  1. 要在 Eclipse 上编译和运行 MapReduce 程序,需要安装 hadoop-eclipse-plugin,可下载 Github 上的 hadoop2x-eclipse-plugin。
  2. 下载后,将 release 中的 hadoop-eclipse-kepler-plugin-2.6.0.jar 复制到 Eclipse 安装目录的 plugins 文件夹中,运行 eclipse -clean 重启 Eclipse 即可(添加插件后只需要运行一次该命令,以后按照正常方式启动就行了)。
  3. 打开eclipse,进行hadoop插件配置。
    1. 选择Window菜单下的Preference。
    2. 然后选择Hadoop Map/Reduce,选择hadoop的安装目录,并确认配置。
    3. 在输出窗口下又一个蓝色大象,点击可进行hadoop环境配置。
    4. 按如下进行设置:

      其中,Localtion name可以随意填写,端口号则为9000。还有很多配置参数,为了方便,直接先创建WordCount的MapReduce工程,然后将/usr/local/hadoop/etc/hadoop中的配置文件core-site.xml ,hdfs-site.xml以及 log4j.properties 复制到 WordCount 项目下的 src 文件夹(~/workspace/WordCount/src)中:复制完成后,需要对工程文件进行刷新。

      这样在运行MapReduce作业时,就会使用配置文件中的配置参数。

    5. 然后就可以进行开发了。
  4. 注:HDFS 中的内容变动后,Eclipse 不会同步刷新,需要右键点击 Project Explorer中的 MapReduce Location,选择 Refresh,才能看到变动后的文件。

(1)编程实例–WordCount:

功能:对指定输入的文件进行单词个数统计,然后输出到指定文件夹中。

程序代码:

import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; public class WordCount{ 
    public WordCount(){ 
    } public static void main(String[] args) throws Exception{ 
    Configuration conf = new Configuration(); //指定输入文件路径input和输出文件路径output String[] otherArgs = new String[]{ 
   "/input","/output"}; if(otherArgs.length < 2){ 
    System.err.println("没有输入输出路径!"); System.exit(2); } / * Job:它允许用户配置作业、提交作业、控制其执行和查询状态。 * SET方法仅在提交作业之前工作, * 之后它们将引发非法LealEtExeExchange。 */ //创建没有特定集群和给定作业名的新作业。 //只有当需要时,才会从CONF参数创建一个集群。 //作业生成配置的副本,以便任何必要的内部修改不反映传入参数。  Job job = Job.getInstance(conf, "word count"); //通过找到给定类的来源来设置jar job.setJarByClass(WordCount.class); job.setMapperClass(WordCount.TokenizerMapper.class); job.setCombinerClass(WordCount.IntSumReducer.class); job.setReducerClass(WordCount.IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for(int i = 0; i < otherArgs.length - 1; ++i) { 
    //FileInputFormat:基于文件的输入格式的基类 //添加输入文件路径 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } //FileOutputFormat:基于文件的输出格式的基类 //添加输出文件路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true)?0:1); } / * Reduce:减少一组中间值,这些值共享一组较小的值。 */ public static class IntSumReducer extends Reducer <Text, IntWritable, Text, IntWritable>{ 
    private IntWritable result = new IntWritable(); public IntSumReducer(){ 
    } public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException{ 
    int sum = 0; IntWritable val = null; for(Iterator it = values.iterator(); it.hasNext(); sum += val.get()){ 
    val = (IntWritable) it.next(); } this.result.set(sum); context.write(key, this.result); } } / * Mapper:将输入的键/值对映射到一组中间键/值对 * 映射是将输入记录转换为中间记录的单个任务。转换后的 * 中间记录不需要与输入记录相同。给定的输入对可以映射到零或多个输出对。 */ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ 
    //IntWritable:一个可写的用于int型的。 //设置一个变量one为1. private static final IntWritable one = new IntWritable(1); //Text:该类使用标准UTF8编码存储文本。 private Text word = new Text(); public TokenizerMapper(){ 
    } //map():为输入分割中的每个键/值对调用一次。 //大多数应用程序应该重写这个,但是默认是标识函数。  public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException{ 
    // StringTokenizer这个类主要是把一个字符串按某个标记分段, //默认的情况下的分割符是空格 StringTokenizer itr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()){ 
    this.word.set(itr.nextToken()); context.write(this.word, one); } } } } 

(2)编程实例-求平均值:

功能:计算学生的平均成绩,每个文件包括所有的学生成绩,格式为 姓名 成绩,有多少个科目,就有多少个输入文件。

import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //import org.apache.hadoop.util.GenericOptionsParser; / * 计算学生的平均成绩 * 学生成绩以每科一个文件输入 * 文件内容:姓名 成绩 * 例如:  小明  80 */ public class AverageScore { 
    public AverageScore(){ 
    } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
    Configuration conf = new Configuration(); //设置文件输入输出路径 String[] otherArgs = new String[]{ 
   "/input1","/output1"}; //可以用来读取输入输出文件参数,这里采用上一行代码,手动设置路径 //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length < 2){ 
    System.err.println("请输入至少两个文件!"); System.exit(2); } //设置工作参数 Job job = Job.getInstance(conf,"Average Score"); job.setJarByClass(AverageScore.class); job.setMapperClass(AverageScore.AverageMapper.class); job.setCombinerClass(AverageScore.AverageReduce.class); job.setReducerClass(AverageScore.AverageReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FloatWritable.class); //输入文件路径 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //输出文件路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)?0:1); } /* * map():将每个输入文件,将姓名和成绩分割开。 */ public static class AverageMapper extends Mapper<Object, Text, Text, FloatWritable>{ 
    public AverageMapper(){ 
    } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
    String line = value.toString(); //按行进行划分 StringTokenizer tokens = new StringTokenizer(line,"\n"); while(tokens.hasMoreTokens()){ 
    String tmp = tokens.nextToken(); //按空格进行划分 StringTokenizer sz = new StringTokenizer(tmp); String name = sz.nextToken(); float score = Float.valueOf(sz.nextToken()); Text outName = new Text(name); FloatWritable outScore = new FloatWritable(score); context.write(outName, outScore); } } } / * reduce():将同一个学的各科成绩加起来,求平均数 */ public static class AverageReduce extends Reducer<Text, FloatWritable, Text, FloatWritable>{ 
    public AverageReduce(){ 
    } protected void reduce(Text key, Iterable<FloatWritable> value, Context context) throws IOException, InterruptedException { 
    float sum = 0;//刚开始总分为0 int count = 0;//记录有几科成绩 Iterator<FloatWritable> it = value.iterator();//遍历成绩 //获取各科成绩进行累加 while(it.hasNext()){ 
    sum += it.next().get(); count++; } //求出平均值 FloatWritable averageScore = new FloatWritable(sum/count); //写人文件 context.write(key,averageScore); } } } 

(3)编程实例-数据去重:

功能:数据重复,map中每一行做为一个key,value值任意,经过shuffle之后输入到reduce中利用key的唯一性直接输出key。

数据:

file1.txt

file2.txt

源代码:

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //import org.apache.hadoop.util.GenericOptionsParser;  / * 数据去重 */ public class Dedup { 
    public static class MyMapper extends Mapper<Object, Text, Text, Text>{ 
    @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
    //value:为每行数据 context.write(value, new Text("")); } } public static class MyReducer extends Reducer<Text, Text, Text, Text>{ 
    @Override protected void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException { 
    context.write(key, new Text("")); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ 
    Configuration conf = new Configuration(); //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  String[] otherArgs = new String[]{ 
   "/input2","/output2"}; if(otherArgs.length<2){ 
    System.out.println("parameter errors!"); System.exit(2); } Job job = Job.getInstance(conf, "Dedup"); job.setJarByClass(Dedup.class); job.setMapperClass(MyMapper.class); job.setCombinerClass(MyReducer.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)?0:1); } } 

程序运行后输入文件为:


以上内容为听华为大数据培训课程和大学MOOC上厦门大学 林子雨的《大数据技术原理与应用》课程而整理的笔记。

大数据技术原理与应用: https://www.icourse163.org/course/XMU-


版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/226081.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 上午7:53
下一篇 2026年3月17日 上午7:53


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号