博客
关于我
TF-IDF mapreduce实现
阅读量:350 次
发布时间:2019-03-04

本文共 14379 字,大约阅读时间需要 47 分钟。

前言:边写的时候我有修改一些代码,可能前后会有点对不上

文章目录

TF

map

输入:key:当前行偏移位置, value:当前行内容

输出:key: word:workname,val: 1
hadoop的context.write,必须保证有值写入,不能是null,如果是null会让map输出和reduce的输入不匹配而造成程序运行错误。
setup:
在开始前执行且仅执行一次
获取每个文件所属的作品名,避免在map中重复计算

public static class TFMapper extends Mapper
{ private final Text one = new Text("1"); private Text word = new Text(); private String workName = ""; @Override protected void setup(Mapper
.Context context) throws IOException, InterruptedException { InputSplit inputSplit = context.getInputSplit(); String fileFullName = ((FileSplit)inputSplit).getPath().toString(); String[] nameSegments = fileFullName.split("/"); workName = nameSegments[nameSegments.length - 1]; int idx = workName.indexOf("第"); if (idx != -1){ workName = workName.substring(0, idx); } } @Override protected void map(Object key, Text value, Mapper
.Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); while (tokenizer.hasMoreTokens()) { word.set(String.join(":", tokenizer.nextToken(), workName)); context.write(word, one); } } }

combiner

对map的结果做一次合并,但是hadoop文档也有说明,即使设置了combiner它也不是一定会被执行的

public static class TFCombiner extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException { if (values == null) { return; } int sumCount = 0; for (Text val : values) { sumCount += Integer.parseInt(val.toString()); } context.write(key, new Text(String.valueOf(sumCount))); } }

Partitioner

分区数应该和你分配的reducer一样数目(小于也可以),但不能大于,不然就会出现像取模得到5,但没有5号reducer的情况;

分区数默认为1,你可以自己设置

public static class TFPartitioner extends Partitioner
{ @Override public int getPartition(Text key, Text value, int numPartitions) { String fileName = key.toString().split(":")[1]; numPartitions = taskNum; return Math.abs((fileName.hashCode() * 127) % numPartitions); } }

reducer

key:word:workname

value:tf
reducer和combiner的逻辑基本一致,combiner的输出作为reducer的输入,在某些情况下,你要注意combiner不应该影响你想要呈现的真实结果。

public static class TFReducer extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException { if (values == null) { return; } int sumCount = 0; for (Text val : values) { sumCount += Integer.parseInt(val.toString()); } context.write(key, new Text(String.valueOf(sumCount))); } }

MAIN

public static void main(String[] args) {           Configuration conf = new Configuration();        try {               Job job = Job.getInstance(conf);             job.setJobName("TF-job");            job.setJarByClass(TFMR.class);            job.setMapperClass(TFMR.TFMapper.class);                job.setCombinerClass(TFMR.TFCombiner.class);            job.setPartitionerClass(TFMR.TFPartitioner.class);              job.setNumReduceTasks(taskNum);            job.setReducerClass(TFMR.TFReducer.class);            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(Text.class);            String inputPath = args[0];            String outputPath = args[1];            FileInputFormat.addInputPath(job, new Path(inputPath));            FileOutputFormat.setOutputPath(job, new Path(outputPath));            job.waitForCompletion(true);        } catch (Exception e) {               e.printStackTrace();        }    }

TF的测试

实验要求是按第字分解获取作品名,我自己写了个很小的样例集来检查

  1. 在hadoop目录下创建test-in目录作测试用:
    hadoop fs -mkdir /test-in
    查看目录是否创建 hadoop fs -ls /
    直接ls查看不到test-in目录不用慌,因为HDFS是hadoop原生态的文件系统,这条命令是在hdfs文件系统下创建目录,所以要用hadoop的命令方式才能查看到你的文件
  2. 在本地写一个测试文件
    内容如下
    在这里插入图片描述我的文件是在workspace文件夹下,把它复制到test-in目录下
    hdfs dfs -copyFromLocal /home/xjm/class3_spring/big-data/workspace/little /test-in
    hadoop fs -ls /test-in看一下是否复制成功了
    在这里插入图片描述
    查看文本内容hdfs dfs -cat /test-in/小说第一
  3. 运行测试
    在pom文件指定程序入口com.xjm.TF
    hadoop jar TF-IDF-1.0-SNAPSHOT.jar /test-in /test-out
    (调整为自己的jar包路径)
    输出目录不能事先存在,只能由程序创建,如果已经有test-out目录,
    可以通过hadoop fs -rm -r -skipTrash /test-out删除
  4. 查看结果
    查看你的输出目录hadoop fs -ls /test-out
    设置了几个reducer就有几个输出文件
    hadoop fs -cat /test-out/part-r-00000
    可以看到已经正确输出了
    在这里插入图片描述

IDF

实验要求TF是按作品数,一个作品被分成了多个文件,像这样,

在这里插入图片描述

而这里的IDF是按文件数算,所以上一步的输出没法利用。

这里我是直接在setup累加的总文件数,所以是默认一个文件对应一个map;
我查到说由于文件大小和设置分区大小,分片数可能!=文件数,这样的话总文件数有问题,我想用set记录,然后算set的大小解决;但是随之想到,同时也可能出现一个文件中分成多片后,word被记录了多次,所以这里还要另外处理,嗯…
不过,默认配置的话,文件还没大到那个程度,庆幸
RE:改了文档数统计,之前用static,结果一直是0,叹气

mapper

输入:key:当前行偏移位置, value:当前行内容

输出:key:word, value:1(填充值,无意义)

public static class IDFMapper extends Mapper
{ private final Text one = new Text("1"); private Text word = new Text(); @Override protected void map(Object key, Text value, Mapper
.Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } }

combiner

将相同key值记录合并,一个word在文档中出现应该只有一条记录

public static class TFCombiner extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException { if (values == null) { return; } context.write(key, new Text("1")); } }

partitioner

按照word进行分区

public static class IDFPartitioner extends Partitioner
{ @Override public int getPartition(Text key, Text value, int numPartitions) { String word = key.toString(); numPartitions = taskNum; return Math.abs((word.hashCode() * 127) % numPartitions); } }

reducer

输入:key:word, value:{1,1…}

输出:key:word ,value:idf

public static class IDFReducer extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException { if (values == null) { return; } int fileCount = 0; for (Text value : values) { fileCount += Integer.parseInt(value.toString()); } double idfValue = Math.log10(1.0 * totalFiles / (fileCount + 1)); context.write(key, new Text(String.valueOf(idfValue))); } }

MAIN

在输入的输出路径名加上IDF后缀以区分,通过API计算文件总数

这个计算数目有点问题,是非递归的

public static void main(String[] args) {           Configuration conf = new Configuration();        try {               Job job = Job.getInstance(conf);            job.setJobName("IDF-job");            job.setJarByClass(IDFMR.class);            job.setMapperClass(IDFMR.IDFMapper.class);            job.setCombinerClass(IDFMR.IDFCombiner.class);            job.setPartitionerClass(IDFMR.IDFPartitioner.class);            job.setNumReduceTasks(taskNum);            job.setReducerClass(IDFMR.IDFReducer.class);            job.setMapOutputKeyClass(Text.class);            job.setMapOutputValueClass(Text.class);            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(Text.class);            Path inputPath = new Path(args[0]);            // 计算文档总数并加入设置            job.setProfileParams(String.valueOf(getFilesNum(conf, inputPath)));            String outputPath = args[1] + "IDF";            Path fileOutPath = new Path(outputPath);            FileSystem fileSystem = FileSystem.get(conf);            if (fileSystem.exists(fileOutPath)) {                   fileSystem.delete(fileOutPath, true);            }            FileInputFormat.addInputPath(job, inputPath);            FileOutputFormat.setOutputPath(job, fileOutPath);            job.waitForCompletion(true);        } catch (Exception e) {               e.printStackTrace();        }    }
private static int getFilesNum(Configuration conf, Path inputPath) throws FileNotFoundException, IOException {           FileSystem fs = FileSystem.get(conf);        FileStatus status[] = fs.listStatus(inputPath);        return status.length;    }

测试

  1. 运行测试
    在pom文件指定程序入口com.xjm.IDF
    hadoop jar TF-IDF-1.0-SNAPSHOT.jar /test-in /test-out
    (调整为自己的jar包路径)
    输出目录不能事先存在,只能由程序创建,如果已经有输出目录,
    可以通过hadoop fs -rm -r -skipTrash /test-outIDF删除
  2. 查看结果
    查看你的输出目录hadoop fs -ls /test-outIDF
    设置了几个reducer就有几个输出文件
    hadoop fs -cat /test-outIDF/part-r-00000
    老是卡住跑不出来,好不容易跑出来了,结果是infnity,叹气,看了下是总文件数计算的问题
    调了调,跑出来了
    为了验证,我打了三行,分别是包含的文件数,总文件数,idf,验算了下没问题
    在这里插入图片描述
    验算完之后就可以把多余的两行删掉了
    在这里插入图片描述

TF-IDFMR

分别计算完tf 和idf之后,将两个结果合并

TF的输出格式是 word:workname tf
IDF的输出格式是 word idf
把这两个作为输入,得到的输出是 workname word tf-idf
文件输入进来mapper的输入key 是行开头字符偏移量,value是行内容

mapper

把输入格式处理成

word workname#tf
word idf

public static class TFIDFMapper extends Mapper
{ @Override protected void map(Object key, Text value, Mapper
.Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); String str1 = tokenizer.nextToken(); String str2 = tokenizer.nextToken(); if (str1.indexOf(":") != -1){ String str3 = str1.split(":")[0]; String str4 = str1.split(":")[1]; context.write(new Text(str3), new Text(String.join("#", str4, str2))); } else context.write(new Text(str1), new Text(str2)); } }

reducer

找出不含#的value作为idf,其他的依次分割出作品名和tf,并计算

按要求格式输出

public static class TFIDFReducer extends Reducer
{ private Text workname = new Text(); private Text wordval = new Text(); @Override protected void reduce(Text key, Iterable
values, Reducer
.Context context) throws IOException, InterruptedException { if (values == null) { return; } double idf = 1.0; ArrayList
list = new ArrayList
(); for (Text val : values) { String str = val.toString(); if (str.indexOf("#") != -1) list.add(str); else idf = Double.parseDouble(str); } for(String str:list){ workname.set(str.split("#")[0]); int tf = Integer.parseInt(str.split("#")[1]); String tfidf = String.valueOf(idf*tf); wordval.set(String.join(" ",key.toString(),tfidf)); context.write(workname, wordval); } } }

main

在输入路径加入相应后缀

public static void main(String[] args) {           Configuration conf = new Configuration();        try {               Job job = Job.getInstance(conf);             job.setJobName("TFIDF-job");            job.setJarByClass(TFIDFMR.class);            job.setMapperClass(TFIDFMR.TFIDFMapper.class);                job.setPartitionerClass(TFIDFMR.TFIDFPartitioner.class);              job.setNumReduceTasks(taskNum);            job.setReducerClass(TFIDFMR.TFIDFReducer.class);            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(Text.class);            //            String inputPath1 = args[0] + "TF";            String inputPath2 = args[0] + "IDF";            String outputPath = args[1] + "TFIDF";            Path fileOutPath = new Path(outputPath);            FileSystem fileSystem = FileSystem.get(conf);            if (fileSystem.exists(fileOutPath)) {                   fileSystem.delete(fileOutPath, true);            }              FileInputFormat.addInputPaths(job, String.join(",", inputPath1, inputPath2));            FileOutputFormat.setOutputPath(job, fileOutPath);            job.waitForCompletion(true);        } catch (Exception e) {               e.printStackTrace();        }    }

测试

  1. 运行测试
    在pom文件指定程序入口com.xjm.TFIDFMR
    hadoop jar TF-IDF-1.0-SNAPSHOT.jar /test-out /test-out
    (调整为自己的jar包路径)
    输出目录不能事先存在,只能由程序创建,如果已经有输出目录,
    可以通过hadoop fs -rm -r -skipTrash /test-outIDF删除
    [RE:代码加了检测,已经存在的话会先自动删除]
  2. 查看结果
    查看你的输出目录hadoop fs -ls /test-outIDF
    在这里插入图片描述

client

三个单元分别测试成功后,写一个工作台将三个连接起来工作

package com.xjm;public class Client {       public static void main(String[] args) {           TFMR.main(args);        IDFMR.main(args);        String[] nargs = {   args[1],args[1]};        TFIDFMR.main(nargs);      }}
  1. 运行测试
    在pom文件指定程序入口为com.xjm.Client
    hadoop jar TF-IDF-1.0-SNAPSHOT.jar /test-in /test-out
    (调整为自己的jar包路径)
    输出目录不能事先存在,只能由程序创建,如果已经有输出目录,
    可以通过hadoop fs -rm -r -skipTrash /test-out删除
    [RE:代码加了检测,已经存在的话会先自动删除]
  2. 查看结果
    查看你的输出目录hadoop fs -ls /test-out
    为了比对输出内容,我写的是test-out1
    在这里插入图片描述
    贴个全的,和之前对比一致
    在这里插入图片描述

POM

程序入口

maven-assembly-plugin
false
jar-with-dependencies
com.xjm.TFMR
make-assembly
package
assembly

一些遇到的问题

The import org.apache cannot be resolved

我是用maven管理的项目,在pom文件里加入相关依赖就好了。

如果下载不成功,有很多原因;你可以在classpath加入你下的hadoop jar包;或者换源下载,我用阿里云的昨天一直下不了,1是老仓库的地址更新了。2 深更半夜好像连接不好,终端一直显示unknown hosts,今早重试又可以了…

跑了很久都没有结果

看8088界面状态信息,以及确保你的各节点成功开启了

我又遇到unhealthy nodes了…
删磁盘,重启几次虚拟机又能跑了…

waiting for AM container to be allocated, launched and register with RM.

被挂起了,原因是资源不够

转载地址:http://kumq.baihongyu.com/

你可能感兴趣的文章
Mysql学习总结(19)——Mysql无法创建外键的原因
查看>>
Mysql学习总结(21)——MySQL数据库常见面试题
查看>>
Mysql学习总结(22)——Mysql数据库中制作千万级测试表
查看>>
Mysql学习总结(23)——MySQL统计函数和分组查询
查看>>
Mysql学习总结(24)——MySQL多表查询合并结果和内连接查询
查看>>
Mysql学习总结(25)——MySQL外连接查询
查看>>
Mysql学习总结(26)——MySQL子查询
查看>>
Mysql学习总结(37)——Mysql Limit 分页查询优化
查看>>
Mysql学习总结(38)——21条MySql性能优化经验
查看>>
Mysql学习总结(45)——Mysql视图和事务
查看>>
Mysql学习总结(58)——深入理解Mysql的四种隔离级别
查看>>
Mysql学习总结(59)——数据库分库分表策略总结
查看>>
Mysql学习总结(80)——统计数据库的总记录数和库中各个表的数据量
查看>>
Mysql学习总结(83)——常用的几种分布式锁:ZK分布式锁、Redis分布式锁、数据库分布式锁、基于JDK的分布式锁方案对比总结
查看>>
MySQL定义和变量赋值
查看>>
Mysql实战之数据备份
查看>>
mysql实现成绩排名
查看>>
Mysql客户端中文乱码问题解决
查看>>
mysql导入数据库出现:Incorrect string value: '\xE7\x82\xB9\xE9\x92\x9F' for column 'chinese' at row 1...
查看>>
Mysql工作笔记006---Mysql服务器磁盘爆满了_java.sql.SQLException: Error writing file ‘tmp/MYfXO41p‘
查看>>