java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java MapReduce词频统计

使用Java实现MapReduce词频统计示例代码

作者:辛小贝达尔比

这篇文章主要介绍了使用Java实现MapReduce词频统计的相关资料,通过词频统计示例来展示MapReduce的运行机制,涵盖了Mapper和Reducer的实现,并说明了如何配置和执行MapReduce作业,需要的朋友可以参考下

前言

在这篇博客中,我们将学习如何使用 Java 实现 Hadoop 的 MapReduce 框架,并通过一个词频统计(WordCount)的例子,来了解 MapReduce 的工作原理。MapReduce 是一种流行的大规模数据处理模式,广泛应用于分布式计算环境中。

一、正文

1. 代码结构

我们将在以下三个文件中实现 MapReduce 的核心功能:

接下来我们将逐一分析这些代码。

2. Map.java——Mapper 实现

首先看下 Mapper 类的代码实现:

package demo1;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;


//public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

public class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1); // 计数器
    private Text word = new Text(); // 存储当前处理的单词

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 将每行的文本数据分割成单词,可使用split()实现相同功能
        StringTokenizer tokenizer = new StringTokenizer(value.toString());
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken()); // 获取下一个单词
            context.write(word, one); // 输出单词及其计数1
        }
    }
}

功能解读:

注意事项:

   StringTokenizer 用于分割每行文本,将其分割成单词。

   context.write(word, one) 将结果输出到 Reducer 处理时会被聚合。每遇到一个相同的单词,后面会将其所有的 1 聚合成总和。

Mapper类的泛型定义

典型的 Mapper 类定义如下:

public class Mapper&lt;KEYIN, VALUEIN, KEYOUT, VALUEOUT&gt;

这表示 Mapper 是一个泛型类,带有四个类型参数。每个参数对应 Mapper 任务中的不同数据类型。让我们逐个解释这些泛型参数的含义:

3. Reduce.java——Reducer 实现

接下来我们实现 Reducer:

package demo1;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get(); // 累加单词出现的次数
        }
        result.set(sum); // 设置聚合后的结果
        context.write(key, result); // 输出单词及其总次数
    }
}

功能解读:

注意事项:

  for (IntWritable val : values) 遍历所有的计数值,并累加得到单词的总次数。

      结果会输出为 <单词, 出现次数>,存储到最终的输出文件中。

Reducer类的泛型定义

public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

Reducer 类带有四个泛型参数,每个参数对应 Reducer 任务中的不同数据类型。

4. WordCount.java——作业配置与执行

最后,我们编写主程序,用于配置和启动 MapReduce 作业:

package demo1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

public class WordCount {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration(); // 配置项
        Job job = Job.getInstance(conf, "word count"); // 创建一个新作业

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(1); // 输入输出路径检查
        }

        job.setJarByClass(WordCount.class); // 设置主类
        job.setMapperClass(Map.class); // 设置Mapper类
        job.setReducerClass(Reduce.class); // 设置Reducer类

        // 设置Map和Reduce输出的键值对类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 输入输出路径
        FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // 输入路径
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 输出路径

        // 提交作业,直到作业完成
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

功能解读:

注意事项:

   GenericOptionsParser 用于解析命令行输入,获取输入和输出路径。

         提交作业后,Hadoop 框架会根据配置自动运行 Mapper 和 Reducer,并将结果输出到指定的路径。

二、知识回顾与补充

数据的偏移量?

数据的偏移量,即 LongWritable key,是 MapReduce 程序中 Mapper 输入的键,它表示输入数据文件中每行文本的起始字节位置

例:假设我们有一个文本文件:

0: Hello world
12: Hadoop MapReduce
32: Data processing

        每行的前面的数字(0, 12, 32)就是对应行在文件中的偏移量,表示从文件开头到该行起始字节的距离。LongWritable 类型的 key 就表示这个偏移量。

        在 Mapper 中,输入是以 <偏移量, 文本行> 这样的键值对形式提供的。虽然偏移量在词频统计任务中不重要,但在某些应用中,如文件处理、日志解析时,偏移量可以帮助追踪数据的位置。

Context

Context 是 MapReduce 框架中 Mapper 和 Reducer 中非常重要的一个类,它提供了与框架进行交互的方法。

Context 的主要作用

Iterable<IntWritable> values是什么类型?

在 Reducer 阶段,Iterable<IntWritable> values 表示与同一个键(即单词)相关联的所有 IntWritable 值的集合。

其他遍历方法

除了 for (IntWritable val : values) 这种增强型 for 循环,我们还可以使用 Iterable遍历

Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
    IntWritable val = iterator.next();
    sum += val.get(); // 处理每个值
}

Iterator 提供 hasNext() 方法检查是否有更多元素,next() 方法返回当前元素并指向下一个。

Configuration conf = new Configuration() 的作用是什么?

Configuration 类用于加载和存储 Hadoop 应用程序运行时的配置信息,它是一个 Hadoop 配置系统的核心组件,能够让你定义和访问一些运行时参数。每个 MapReduce 作业都依赖 Configuration 来初始化作业配置。

Configuration 的具体作用:

为什么需要 Configuration

在 MapReduce 应用中,集群的规模较大,许多配置参数(如文件系统路径、任务调度器配置等)都存储在外部的配置文件中,Configuration 类可以动态加载这些配置,避免硬编码。

用 split() 方法实现默认分隔符的分割

如果想实现类似于 StringTokenizer 的默认行为(用空白字符分割),可以使用正则表达式 \\s+,它表示匹配一个或多个空白字符(与 StringTokenizer 的默认行为一样)。

示例代码:

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 使用 split() 方法来分割字符串,使用空白字符作为分隔符
        String[] tokens = value.toString().split("\\s+");

        // 遍历分割后的标记
        for (String token : tokens) {
            word.set(token);
            context.write(word, one);
        }
    }

IntWritable

IntWritable 是 Hadoop 提供的一个类,属于 org.apache.hadoop.io 包。它是 Hadoop 框架中用来封装 int 类型的数据的一个可序列化(writable)的包装类。

在 Hadoop MapReduce 中,所有数据类型都需要实现 Writable 和 Comparable 接口,以便能够通过网络在节点之间传输。IntWritable 作为 Hadoop 中的基本数据类型之一,提供了一些便利方法来存储和处理 int 数据。

IntWritable 类的作用:

在 MapReduce 中,Hadoop的数据类型都需要实现 Writable 接口,这样它们就可以在分布式系统中通过网络传输。IntWritable 封装了一个 Java 的 int 类型,用于 Hadoop 的输入输出键值对。

主要的用途

如何使用 IntWritable

IntWritable 提供了构造方法和一些方法来设置和获取 int 值。

1. 创建 IntWritable 对象

可以通过构造方法直接创建对象:

// 创建一个默认值为 0 的 IntWritable 对象
IntWritable writable1 = new IntWritable();

// 创建一个值为 10 的 IntWritable 对象
IntWritable writable2 = new IntWritable(10);

2. 设置值和获取值

可以通过 set() 方法来设置值,通过 get() 方法来获取 IntWritable 封装的 int 值。

IntWritable writable = new IntWritable();

// 设置值为 42
writable.set(42);

// 获取值
int value = writable.get(); // value == 42

3. 在 MapReduce 中使用

在 MapReduce 任务中,IntWritable 通常被用于输出的值。例如,在计数器的 MapReduce 程序中,常将 IntWritable 的值设置为 1,表示一个单词的出现次数。

示例:MapReduce 中的 IntWritable 使用

public class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1); // 值为 1 的 IntWritable
    private Text word = new Text();

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] tokens = value.toString().split("\\s+");

        // 遍历每个单词并写入 context
        for (String token : tokens) {
            word.set(token);
            context.write(word, one);  // 输出 <单词, 1>
        }
    }
}

在这个 MapReduce 例子中:

总结:

IntWritable 之所以在 Hadoop 中使用,而不是原生的 int 类型,是因为:

        Hadoop 需要能通过网络传输的类型,IntWritable 实现了 Writable 接口,可以序列化和反序列化。

  IntWritable 实现了 Comparable 接口,因此可以在 Hadoop 的排序操作中使用。

Job job = Job.getInstance(conf, "word count"); 这是什么意思?

Job job = Job.getInstance(conf, "word count");

这行代码创建并配置一个新的 MapReduce 作业实例。

在Driver中,为什么只设置输出的键值对类型?不设置输入呢?

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

        1. 输入数据的键值对类型

        是由 InputFormat(如 TextInputFormat)决定的,默认读取每行数据的偏移量和内容作为键值对传递给 Mapper。 

        Hadoop MapReduce 使用 InputFormat 类来读取输入数据文件。默认的输入格式是 TextInputFormat,它会自动将输入文件解析成键值对形式,而你不需要在 Driver 中显式指定输入的类型。

所以,Mapper 的输入键值对类型已经由 InputFormat 控制,不需要你在 Driver 中手动指定。

        2. 最终输出的键值对类型

        需要你在 Driver 中显式设置,因为这是写入到 HDFS 中的数据类型。

 setOutputKeyClass 和 setOutputValueClass 的作用

        在 Driver 中,你需要明确指定的是 最终输出结果的键值对类型,即 Reducer 输出的键值对类型,因为这是写入到 HDFS 中的数据类型。

这两项设置明确告诉 Hadoop,最后存储在 HDFS 中的结果文件中,键和值分别是什么类型。

总结

到此这篇关于使用Java实现MapReduce词频统计的文章就介绍到这了,更多相关Java MapReduce词频统计内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文