Mapreduce学习笔记

Mapreduce学习笔记

管理员
2025年6月25日
大数据
#mapreduce

1 介绍

(1) MapReduce是一个用于处理海量数据的分布式计算框架这个框架解决了:

  1. 数据分布式存储
  2. 作业调度
  3. 容错
  4. 机器间通信等复杂问题

(2) MapReduce只负责数据计算,不负责存储,数据是存储在HDFS上,因为HDFS:系统可靠、可扩展、可并发处理

(3) MapReduce缺点:

  1. MapReduce过于底层,编写Map,Reduce函数较为困难
  2. 不是所有算法都能用MapReduce实现
  3. 不适合实时响应的需求。

(4) MapReduce处理数据方式

  1. 离线处理方式
  2. 海量数据集:GTP级都能处理
  3. 全量数据集同时处理:一次性同时处理整个数据集
  4. 批处理方式:大数据输入,大批数据输出
  5. 吞吐能力强
  6. 实效性不高

2 MapReduce编程模型

Map 和 Reduce 的概念是从函数式编程语言中借来的,整个 MapReduce 计算过程分为 Map 阶段和 Reduce 阶段,也称为映射和缩减阶段,这两个独立的阶段实际上是两个独立的过程,即 Map 过程和 Reduce 过程,在 Map 中进行数据的读取和预处理,之后将预处理的结果发送到 Reduce 中进行合并。

3 MapReduce与spark对比

(1) 两者都是分布式计算框架;

(2) MapReduce是多进程并发方式,优点:多进程的并发模型便于每个任务占用资源进行控制调配,进程空间是独享的;缺点:相比线程来说会消耗更的启动时间,所以MapReduce实效性不高,只适合做批量操作,高吞吐的情况下不能寄托它太多的实效性。

(3) Spark采用的是多线程并发模型,运行更快,实效性更好;但是多线程出现多个节点资源竞争,相比于多进程,不清楚每个节点需要多少资源,所以在大作业来说,spark高并发任务是没有mapreduce运行稳定的。

4 单机计算执行流程

[MISSING IMAGE: , ]

单机简单计算数据流程:

读取数据,对数据进行逻辑处理,write模块对处理后的数据进行输出,固化到本地磁盘上。

5 MapReduce执行流程

[MISSING IMAGE: , ]

(1) 输入和拆分

Input data:数据输入,数据都是存储在HDFS中的

InputForput:MR框架基础类之一,进行数据格式化操作,分为两部分:

  1. 数据分割(Data split):就是在合适的位置对Block进行切割,保证你的句子的完整性(解决跨block问题)
  2. 记录读取器(Record Reader):读取切分好的split,每读取一个split,调用一次map函数,直到split的尾部

(2) Map映射

标准输入导入切分好的split分片,每条记录调用执行一次map()函数,就会在内存中增加数据,映射成<k,v>pairs对,当内存数据达到一定值,会把这些键值对映射成新的键值溢写到磁盘;

(3) Shuffle派发(map到reduce)

Shuffle过程是指Mapper产生的直接输出结果,经过一系列的处理,成为最终的Reducer直接输入数据为止的整个过程。这是mapreduce的核心过程。该过程可以分为两个阶段:

Mapper端的Shuffle:由Mapper产生的结果并不会直接写入到磁盘中,而是先存储在内存中,当内存中的数据量达到设定的阀值时,一次性写入到本地磁盘中(并不是HDFS)。并同时进行sort(排序)、combine(合并)、partition(分桶)等操作。其中,sort 是把 Mapper产生的结果按照key值进行排序;combine是把key值相同的记录进行合并;partition是把数据均衡的分配给 Reducer。

Reducer端的Shuffle:由Mapper和Reducer往往不在同一个节点上运行,所以Reducer需要从多个节点上下载 Mapper的结果数据,并对这些数据进行处理,然后才能被Reducer处理。

(4) Reduce 缩减

多个reduce任务输出的数据都属于不同的partition,因此结果数据的key不会重复,合并reduce的输出文件即可得到最终的结果,最终结果直接写入hdfs,每个 reduce 进程会对应一个输出文件,名称以 part-开头。

6 Shuffle过程

(1) Shuffle:partion、sort、spill、meger、combiner、copy、memery、disk

(2) MemoryBuffer:每个map的结果和partition处理的结果都保存在缓存中

  1. partition和sort都是在这里处理好再溢写出磁盘的
  2. 缓冲区大小:默认100M,溢写阈值:100M * 0.8 = 80M

(3) partition:决定数据由哪个reduce处理,按key选reduce,从而分区

比如采用hash法,有n个reduce,那么数据{“are”:1}的key“are”对n进行取模,返回m,而生成{partition,key,value};

(4) Sort:缓冲区数据按照key进行排序;

(5) Spill:内存缓冲区达到阈值时,溢写spill线程锁住这80M的缓冲区,开始将数据写出到本地磁盘中,然后释放内存。每次溢写都生成一个数据文件。溢出的数据到磁盘前会对数据进行key排序sort以及合并combiner;

(6) Combiner:数据合并,相同的key的数据,value值合并,可以理解为部分的reduce在map里面完成,减少partition的索引数量,合并完后再从磁盘上传递给reduce,目的有两个:一是尽量减少每次写入磁盘的数据量;二是尽量减少下一次复制阶段网络传输的数据量。

(7) Reduce里面也有一个内存缓冲区,它这个内存缓冲区跟Map的内存缓冲区一样,内存缓存满时,也通过sort和combiner,将数据溢写到硬盘文件中(reduce端的缓存设置更灵活,此时reduce函数未运行,也可以占用较大的内存)

(8) 归并排序:开始把指针都指向各自文件的文件头上,然后开始互相之间比较,就是比如三个文件,每一个文件都有一个指针,然后相当于每一个指针都相当于按照从大到小已经排序好的,那么这个时候去往这个大的数据上开始合并。归并排序的前提是你指的每一个数据的都是先排好序的,这就是归并排序。然后做好归并排序之后产生大的文件也是排好序的。

7 工作原理

(1) JobTracker主进程

  1. 负责接收客户作业提交,调度任务到作节点上运行
  2. 监控工作节点状态及任务进度等
  3. Jobtracker利用一个线程池来同时处理心跳和客户请求
  4. 一个MapReduce集群只有一个jobtracker。

(2) tasktracker工作节点

  1. TaskTracker是具体的job执行者
  2. 通过周期性(3秒)的心跳来通知jobtracker其当前的健康状态,每一次心跳包含了可用的map和reduce任务数目、占用的数目以及运行中的任务详细信息。
  3. 在每一个工作节点上永远只会有一个tasktracker,但集群中会有多个tasktracker。

(3) Slave主动向master拉任务

(4)在一个TaskTracker中,从JobTracker获取的job会被分成Map task 和Reduce task,分别由Mapper 和 Reducer 来执行。

8 工作流程

当用户提交一个MapReduce作业时:

(1) 通过客户端节点(client node)提交MapReduce任务请求;

(2) Jobtracker接受到任务后,将任务拆分,调度给空闲的tasktracker,tasktracker在执行任务时,会对Jobtracker返回进度报告,Jobtracker则会记录任务进行情况,一旦出现某个tasktracker任务执行失败,Jobtracker则会把任务分配给另一台tasktracker,直到任务完成为止;

(3) 对于每一个job, tasktracker会将该任务分为Map task 和Reduce task 分别执行。Map task会在tasktracker的Map槽点执行,结果的汇总计算工作会在 tasktracker 的 Reduce槽点执行,最终返回Reduce task的结果。

9 数据本地化

一般地,将 NameNode 和 JobTracker 部署到同一台机器上,各个 DataNode 和 TaskNode 也同样部署到同一台机器上, 这样做的目的是将 map 任务分配给含有该 map 处理的数据块的 TaskTracker 上,同时将程序 JAR 包复制到该 TaskTracker 上来运行,这叫“运算移动,数据不移动”。而分配reduce 任务时并不考虑数据本地化。

多副本,目的是容错,数据层面做到高可用。

10 hadoop调度器

(1) 默认调度器FIFO

Hadoop中默认的调度器,采用先进先出原则;

优先级:very_high、high、normal、low、very low

(2) 计算能力调度器Capacity Scheduler

选择占用资源小,优先级高的先执行

(3) 公平调度器Fair Scheduler

同一队列中的作业公平共享队列中所有资源

11 关于map

(1) Map的个数为split分份数;

(2) 压缩文件不可切分,就是不经过split,一个压缩文件必须由一个map处理;(控制100个map的最好办法就是把100个文件压缩成压缩文件)

(3) 非压缩文件和sequence文件可以切分

(4) dfs.block.size决定block大小

12 关于reduce

(1) reduce个数设置:mapred.reduce.tasks,默认为1;

(2) reduce个数太少:单次执行慢,出错再试成本高;

(3) reduce个数太多:shuffle开销大,输出大量小文件。

13 错误处理

(1) TaskTracker心跳回应失败问题

如果此时执行到Map任务,交由其他TaskTracker节点重新执行该任务,如果此时执行到Reduce任务,交由其他TaskTracker重新执行Reduce任务,不用再从map任务开始.

(2) TaskTracker自身执行任务失败

重新向JobTracker申请新任务,这样的失败次数有4次(可设置),再之后就整个作用也就失败了。

14 streaming开发

Mapreduce和HDFS底层都是采用java去实现的,默认提供java编程接口,而streaming框架是允许任务编程语言去开发大数据的

15 Strieaming优点

(1) 开发效率高

只需按照一定的格式从标准输入读取数据、向标准输出写数据就可以

容易单机调试:cat input |mapper | sort | reducer > output

(2) 程序运行效率高

对于CPU密集的计算,有些语言如C/C++编写的程序可能比用Java效率更高一些

(3) 便于平台进行资源控制

Streaming框架中通过limit等方式可以灵活地限制应用程序使用的内存等资源

16 Streaming局限

(1) streaming默认方式是处理文本数据,对于二进制数据处理还需要将二进制的key和value进行base64的编码转化成文本

(2) 需要两次(本身java框架一次,再另外开发的语言一次,如果使用java就一次)数据拷贝和解析(分隔),带来一定的开销

17 Streaming开发要点

(1) input:指定作业的输入文件的HDFS路径,支持使用*通配符,支持指定多个文件或目录,可以多次使用

(2) output:指定作业的输出文件的HDFS路径,路径必须不存在,并且具备执行作业用户有创建该目录的权限,只能使用一次

(3) mapper:用户自己写的mapper程序

(4) reduce:用户自己写的reduce程序

(5) 分发

  1. file :把本地文件分发到各个节点
  2. cachefile:把hdfs的压缩文件分发到各个节点
  3. archivefile:把hdfs的目录分发到各个节点

(6) jobconf:提交作业的一些配置属性,常见配置:

  1. mapred.map.tasks:map task数目
  2. mapred.reduce.tasks:reduce task数目
  3. stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
  4. num.key.fields.for.partition:指定对key分出来的前几部分做partition,而非整个key
  5. mapred.compress.map.output:map的输出是否压缩
  6. mapred.map.output.compression.codec:map的输出压缩方式
  7. mapred.output.compress:reduce的输出是否压缩
  8. mapred.output.compression.codec:reduce的输出压缩方式

18 如何杀死一个job?

(1) Hadoop job –list 拿到job-id

(2) Hadoop job -kill job-id

最后更新于 2025/6/25