本文不涉及MapReduce的原理介绍,只是从源代码的层面讲讲我对Hadoop的MapReduce的执行过程、数据流的一点理解。
首先贴上一张来之于Yahoo Hadoop 教程
的图片
由上图可以看出,在进入Map之前,InputFormat把存储在HDFS的文件进行读取和分割,形成和任务相关的InputSplits,然后RecordReader负责读取这些Splits,并把读取出来的内容作为Map函数的输入参数。下面我就从代码执行的角度来看,数据是如何一步步从HDFS的file到Map函数的。在Yahoo Hadoop 教程
中已经详细讲解了这一过程。但我作为一个细节控,更想从源代码的级别去理清这一过程,这样我才觉得踏实,才觉得自己真真切切地掌握了这个知识点,因此我仔细阅读了这部分的源代码,写篇博客记录下来,以便以后自己查看。
首先,在Mapper类的run方法中,map函数被循环调用:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
...................................
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
在run方法中,每调用一次context.nextKeyValue(),就执行一遍map方法,而此处的context实际上是实现了Context接口的MapContextImpl(这一点可以在MultithreadedMapper的run方法看出来),其nextKeyValue,getCurrentKey,getCurrentValue方法为:
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}
@Override
public KEYIN getCurrentKey() throws IOException, InterruptedException {
return reader.getCurrentKey();
}
@Override
public VALUEIN getCurrentValue() throws IOException, InterruptedException {
return reader.getCurrentValue();
}
上述代码中的实际上是由reader来完成nextKeyValue的工作,reader是RecordReader实例,RecordReader就是用来读取各个task的splits,产生map函数的输入参数。实现RecordReader接口的类由很多,那此处的reader到底是那个类的实例呢?我们到创建context的地方去看一看。
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(inputFormat.createRecordReader(split, taskContext), reporter);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
..............
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
上面代码中的input是一个NewTrackingRecordReader实例,而NewTrackingRecordReader则是对inputFormat.createRecordReader(split, taskContext), reporter)返回的RecordReader对象的封装,inputFormat是InputFormat类的实例,InputFormat类定义了如何分割可读取文件,
public abstract class InputFormat<K, V> {
public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context
) throws IOException,
InterruptedException;
}
读取文件主要是通过其创建的RecordReader来完成的。Hadoop自带了好几种输入格式,关于输入格式的具体描述可以参考此处
Yahoo Hadoop 教程
。JobContextImpl中包括了InputFormat的get和set方法,默认的实现是
TextInputFormat
---读取文件的行,行的偏移量为key,行的内容为value。我们可以通过重写InputFormat中的isSplitable和createRecordReader来实现自定义的InputFormat,并通过JobContextImpl中的set方法来在map中采用自己的输入格式。
@SuppressWarnings("unchecked")
public Class<? extends InputFormat<?,?>> getInputFormatClass()
throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}
因为读物文件是通过RecordReader完成的,因此接下来看看TextInputFormat中的
RecordReader是什么?
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes();
return new LineRecordReader(recordDelimiterBytes);
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
......................
}
}
可见,TextInputFormat中,创建的RecordReader为LineRecordReader,”textinputformat.record.delimiter“指的是读取一行的数据的终止符号,即遇到“
textinputformat.record.delimiter
”所包含的字符时,该一行的读取结束。可以通过Configuration的set()方法来设置自定义的终止符,如果没有设置
textinputformat.record.delimiter,那么Hadoop就采用以CR,LF或者CRLF作为终止符,这一点可以查看LineReader的readDefaultLine方法
。查看LineRecordReader的实现就知道为什么上面说
TextInputFormat是以行的偏移量为key,行的内容为value了。来看看其中的几个主要的方法:
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
......
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
if (null == this.recordDelimiterBytes){
in = new LineReader(cIn, job);
} else {
in = new LineReader(cIn, job, this.recordDelimiterBytes);
}
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
if (null == this.recordDelimiterBytes) {
in = new LineReader(codec.createInputStream(fileIn, decompressor),
job);
} else {
in = new LineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
}
filePosition = fileIn;
}
} else {
fileIn.seek(start);
if (null == this.recordDelimiterBytes){
in = new LineReader(fileIn, job);
} else {
in = new LineReader(fileIn, job, this.recordDelimiterBytes);
}
filePosition = fileIn;
}
}
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end) {
newSize = in.readLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength));
if (newSize == 0) {
break;
}
pos += newSize;
inputByteCounter.increment(newSize);
if (newSize < maxLineLength) {
break;
}
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
@Override
public LongWritable getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
首先在initialize方法里,根据传入的FileSplit来获取到当前读取文件的path,起始位置,并以此创建真正的文件读取流in,我们可以看见在nextKeyValue方法里,就是由in来读取文件,更新key和value的值。
至此,Hadoop如何把文件数据读取出来,并以何种方式传给Map函数,就一目了然了,同时也更加理解了
Yahoo Hadoop 教程
里面提到的譬如FileInputFormat的默认实现,TextInputFormat是如何实现Key-Value组合等等内容。最大的好处在于,如果我要实现一些自定义的东西,我应该如何去修改代码,如何去在合适的地方嵌入自定义的东西。
相关推荐
Hadoop MapReduce 是 Hadoop 平台根据 MapReduce 原理实现的计算框架,目前已经实现了两个版本,MapReduce ...一个 Hadoop MapReduce 作业(job)的基本工作流程就是,首先把存储在 HDFS 中的输入数据集切分为若干个独
大数据技术的两个基本要点是分布式存储和多点并行运算,Hadoop的分布式文件系统HDFS和并行运算框架Hadoop MapReduce分别予以了实现。计算机集群中的每个节点既是存储节点,也是运算节点,HDFS将大数据文件分布存储在...
近百节课视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。 课程目录 000 上课方式和课程大纲介绍 001 Linux系统基本知识说明和启动Linux虚拟机 ...066 Hadoop MapReduce框架数据类型讲解 067
下面详细介绍MapReduce中Map任务Reduce任务以及MapReduce的执行流程。 Map任务: 读取输入文件内容,解析成key,value对。对输入文件的每一行,解析成key,value对。每一个键值对调用一次map函数。 写自己的逻辑,对...
(3)对于大数据清洗和分析的基本操作流程 清洗不符合规范的数据以及不需要采用的特殊数据、通过字典简化数据格式(如实验第二步中用到的行为地址基础数据)、分析数据中对于生产或研究有意义的数据(如实验第五步...
在hadoop中,有三大法宝——HDFS,MapReduce,Hbase,但是无论是MapReduce,Hbase还是hadoop中的其他组件如:Hive等他们要处理的数据还是处理完了的数据都是存储在HDFS中。可见HDFS可以说是hadoop存储的基础和核心,...
基于hadoop的好友推荐系统 使用 MapReduce 内含系统说明文件
从计算上来说,它通过MapReduce模型,将大数据的计算分发到多台计算机上完成,再将结果合并,减少计算的时间。 Hadoop适合于: 1、超大数据的计算; 2、一次写入、多次读取的模式; 3、可以跑在普通的硬件上。 ...
一个基于Hadoop平台进行的单词统计系统,其中包含了伪分布架构,并且包含HDFS数据存储,结合Java后台利用Mapreduce架包进行单词的统计与分析。包含了完整的实践过程,内涵源代码,以及实验命令,内容丰富,实验过程...
第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...
同时,本文基于Mahout技术实现了这两个分类算法在MapReduce框架上的海量数据流的分类计算,极大地提高了异常流量检测的效率。最后通过实验证明,基于分类器联合的分布式异常流量检测算法可以快速有效地对海量网络数据流...
全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce2.0的框架结构和源码分析,MapReduce2.0的配置与测试,MapReduce2.0运行流程,MapReduce2.0高级程序设计以及相关特性等内容。《MapReduce...
全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce 2.0的框架结构和源码分析,MapReduce 2.0的配置与测试,MapReduce 2.0运行流程,MapReduce 2.0高级程序设计以及相关特性等内容。...
数据处理流程 ⽹站流量⽇志数据分析是⼀个纯粹的数据分析项⽬,其整体流程基本上就是依据数据的处理流程进⾏。有以下⼏个⼤的步骤: 1.1 数据采集 数据采集概念,⽬前⾏业会有两种解释:⼀是数据从⽆到有的过程...
第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...
Hadoop开发者第四期: 海量数据处理平台架构演变; 计算不均衡问题在Hive 中的解决办法; Join 算子在Hadoop 中的实现; 配置Hive 元数据DB 为PostgreSQL; ZooKeeper 权限管理机制;...Hadoop 技术论坛运营数据分享
技术点5 使用Sqoop 从MySQL 导入数据 2.2.4 HBase 技术点6 HBase 导入HDFS 技术点7 将HBase 作为MapReduce 的数据源2.3 将数据导出Hadoop 2.3.1 将数据导入本地文件系统技术点8 自动复制HDFS 中的文件...
但是采用MapReduce编程模型开发一个数据密集型应用,用户不仅需要对各个数据操作按照MapReduce进行实现,还要实现多个操作之间中间数据的传输,实现复杂的数据处理流程,另外采用MapReduce编程模型编写的代码十分...
技术栈:Java、maven、hadoop、flask、前端知识步骤搭建hadoop 分布式环境,hdfs 存储训练数据,MapReduce(MR)进行训练数据