运行 Hadoop 伪分布式实例

伪分布式读取的是 HDFS 上的数据,要使用 HDFS,首先需要在 HDFS 中创建用户目录:

1
[root@slave3 ~]# hdfs dfs -mkdir -p /user/root

接着将 ./etc/hadoop 中的 xml 文件作为输入文件复制到分布式文件系统中,即将 /usr/local/hadoop/etc/hadoop 复制到分布式文件系统中的 /user/root/input 中。我们使用的是 root 用户,并且已创建相应的用户目录 /user/root,因此在此命令中就可以使用香断路径如 input,其对应的绝对路径就是 /user/root/input:

1
2
[root@slave3 ~]# hdfs dfs -mkdir input
[root@slave3 ~]# hdfs dfs -put /usr/local/hadoop/etc/hadoop/*.xml input

复制完成后,可以通过如下命令查看文件列表:

1
2
3
4
5
6
7
8
9
10
[root@slave3 ~]# hdfs dfs -ls input
Found 8 items
-rw-r--r-- 1 root supergroup 4436 2016-11-30 01:05 input/capacity-scheduler.xml
-rw-r--r-- 1 root supergroup 1111 2016-11-30 01:05 input/core-site.xml
-rw-r--r-- 1 root supergroup 9683 2016-11-30 01:05 input/hadoop-policy.xml
-rw-r--r-- 1 root supergroup 1177 2016-11-30 01:05 input/hdfs-site.xml
-rw-r--r-- 1 root supergroup 620 2016-11-30 01:05 input/httpfs-site.xml
-rw-r--r-- 1 root supergroup 3518 2016-11-30 01:05 input/kms-acls.xml
-rw-r--r-- 1 root supergroup 5933 2016-11-30 01:05 input/kms-site.xml
-rw-r--r-- 1 root supergroup 690 2016-11-30 01:05 input/yarn-site.xml

伪分布式云新 MapReduce 作业的方式跟单击模式相同,区别在于伪分布式读取的是 HDFS 中的文件(可以将单机目录中创建的本地 input 文件夹,输出结果 output 文件夹都删掉来验证这一点)。

1
[root@slave3 hadoop]# hadoop jar /usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0-alpha1.jar grep input output 'dfs[a-z.]+'

查看运行结果的命令(查看的是位于 HDFS 中的输出结果):

1
2
3
4
5
[root@slave3 hadoop]# hdfs dfs -cat output/*
1 dfsadmin
1 dfs.replication
1 dfs.namenode.name.dir
1 dfs.datanode.data.dir

我们也可以将运行结果取回本地:

1
2
3
4
5
6
7
[root@slave3 hadoop]# rm -rf ./output   # 先删除本地的 output 文件夹(如果存在的话)
[root@slave3 hadoop]# hdfs dfs -get output ./output # 将 HDFS 上的 output 文件夹拷贝到本机
[root@slave3 hadoop]# cat ./output/*
1 dfsadmin
1 dfs.replication
1 dfs.namenode.name.dir
1 dfs.datanode.data.dir

运行 Hadoop 程序时,为了防止覆盖结果,程序指定的输出目录(如 output)不能存在,否则会提示错误,因此运行前需要先删除输出目录。

上述通过 start-dfs.sh 启动 Hadoop,仅仅是启动了 MapReduce 环境,我们也可以启动 YARN,让 YARN 来负责资源管理与任务调度。这里需要修改 mapred-site.xml 和 yarn-site.xml 两个配置文件:

mapred-site.xml 配置文件修改如下:(mapred-site.xml是从mapred-site.template拷贝过来的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.admin.user.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
</property>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
</property>
</configuration>

yarn-site.xml 配置文件修改如下:

1
2
3
4
5
6
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>

然后就可以启动 YARN 了(当然需要先执行 start-dfs.sh)

1
2
start-yarn.sh
mapred --daemon start historyserver #开启历史任务查看

启动 YARN 之后,运行实例的方法还是一样的,仅仅是资源管理方式、任务调度不同。观察日志信息可以发现,不启用 YARN 时,是 mapred.LocalJobRunner 在跑任务,启用 YARN 之后,是 YARN 在跑任务。启动 YARN 有个好处是可以通过 Web 界面查看任务的运行情况。

但 YARN 主要是为集群提供更好的资源管理与任务调度,然而这在单机上体现不出价值,反而会使程序跑的稍慢些。因此在单机上是否开启 YARN 就看实际情况了。

Hadoop 安装教程-单机/伪分布式配置

环境

1
2
3
4
5
6
7
8
9
# 系统版本
[root@slave3 hadoop]# cat /etc/redhat-release
CentOS Linux release 7.2.1511 (Core)

# 本机IP:192.168.1.43

# 本机HostName:slave3

# 使用 yum -y update 更新系统

安装

  • 主机名修改及Host修改
1
2
hostnamectl set-hostname slave3
vim /etc/hosts # 最后增加一行 192.168.1.43 slave3
  • 安装常用软件及java
1
2
yum install java-1.8.0-openjdk*
yum install wget vim net-tools
  • 确认java安装目录

通过 whereis java 以及 ls -l 的方式定位 java 安装目录,这里是
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.111-1.b15.el7_2.x86_64
具体可以参考:http://ttlop.com/2016/11/25/%E6%9F%A5%E6%89%BE%E9%80%9A%E8%BF%87Yum%E5%AE%89%E8%A3%85%E7%9A%84Java%E7%9A%84%E5%AE%89%E8%A3%85%E8%B7%AF%E5%BE%84/

  • hadoop下载及配置
1
2
3
4
5
6
7
8
# 下载
wget http://apache.fayea.com/hadoop/common/hadoop-3.0.0-alpha1/hadoop-3.0.0-alpha1.tar.gz
# 解压
tar zxvf hadoop-3.0.0-alpha1.tar.gz
# 移动
mv hadoop-3.0.0-alpha1 /usr/local/hadoop
# 修改权限及确认
cd /usr/local && chown -R root.root hadoop && ls -l hadoop && cd hadoop && ls -al
  • SSH 权限修改

通过ssh-keygen 及 authorized_keys 文件修改SSH登录权限。
具体可以参考:http://ttlop.com/2016/11/25/SSH-%E6%97%A0%E5%AF%86%E7%A0%81%E7%99%BB%E5%BD%95%E8%8A%82%E7%82%B9/

  • 关闭防火墙即selinux
1
2
3
# 这里为测试环境,我们将安全选项都关闭
setenforce 8
systemctl stop firewalld
  • 配置修改
1
2
3
4
5
# PATH 修改(当前终端有效)
export PATH=$PATH:/usr/local/hadoop/bin:/usr/local/hadoop/sbin
# hadoop 中 JAVA_HOME修改
vim /usr/local/hadoop/etc/hadoop/hadoop-env.sh
增加 export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.111-1.b15.el7_2.x86_64
  • 单机配置测试

Hadoop 默认模式为非分布式模式,无需进行其它配置即可运行。非分布式即单 Java 进程,方便进行调试。
以下面为例,我们选择运行 grep 例子,我们将 input 文件夹中的所有文件作为输入,筛选当前符合正则表达式 dfs[a-z.]+ 的单词并统计出现的次数,最后输出结果到 output 文件夹中。

1
2
3
4
5
cd /usr/local/hadoop
mkdir ./input
cp ./etc/hadoop/*.xml ./input # 将配置文件作为输入文件
./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar grep ./input ./output 'dfs[a-z.]+'
cat ./output/* # 查看运行结果
  • 伪分布式配置

Hadoop 可以在单节点上以伪分布式的方式运行,Hadoop 进程以分离的 java 进程来运行,节点既作为 NameNode 也作为 DataNode,同时,读取的是 HDFS 中的文件。

Hadoop 的配置文件位于 $HADOOP_HOME/etc/hadoop中,伪分布式需要修改2个配置文件 core-site.xml和hdfs-site.xml。Hadoop 的配置文件是 xml 格式,每个配置以声明 property 的 name 和 value 的方式来实现。
如下是修改后的 core-site.xml文件:

1
2
3
4
5
6
7
8
9
10
11
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>file:/home/hadoop/tmp</value>
<description>Abase for other temporary directories.</description>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

如下是修改后的 hdfs-site.xml文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/home/hadoop/tmp/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/home/hadoop/tmp/dfs/data</value>
</property>
</configuration>

配置完成后,执行 NameNode 的格式化

1
hdfs namenode -format

接着开启 NameNode 和 DataNode 守护进程

1
start-dfs.sh

启动完成后,可以通过命令 jps 来判断是否成功启动,若成功启动则会列出如下进程:

1
2
3
4
5
[root@slave3 dfs]# jps
50290 SecondaryNameNode
50617 Jps
50090 DataNode
49995 NameNode

Centos 7 LVM 磁盘扩容

Hyper 主机有一台 Centos 7 虚拟机,系统是运行 Hadoop 环境,因为调试需要,准备扩容磁盘来增加空间。如何增加虚拟机磁盘空间的操作方法这里不作过多说明,都是图形化操作界面。

这里我们重点在介绍在 Centos 7中的操作。

  • 查看现有的硬盘分区(现有空间没有变大)
1
2
3
4
5
6
7
8
9
10
[root@slave2 ~]# df -h
文件系统 容量 已用 可用 已用% 挂载点
/dev/mapper/centos-root 50G 5.2G 45G 11% /
devtmpfs 1.9G 0 1.9G 0% /dev
tmpfs 1.9G 0 1.9G 0% /dev/shm
tmpfs 1.9G 8.3M 1.9G 1% /run
tmpfs 1.9G 0 1.9G 0% /sys/fs/cgroup
/dev/mapper/centos-home 73G 33M 73G 1% /home
/dev/sda1 497M 152M 346M 31% /boot
tmpfs 379M 0 379M 0% /run/user/0
  • 对新增加的硬盘空间做新增分区(硬盘数没有增加,增加的是空间)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
[root@slave2 ~]# fdisk -l

磁盘 /dev/sda:818.2 GB, 818191269888 字节,1598029824 个扇区
Units = 扇区 of 1 * 512 = 512 bytes
扇区大小(逻辑/物理):512 字节 / 4096 字节
I/O 大小(最小/最佳):4096 字节 / 4096 字节
磁盘标签类型:dos
磁盘标识符:0x0009a639

设备 Boot Start End Blocks Id System
/dev/sda1 * 2048 1026047 512000 83 Linux
/dev/sda2 1026048 266338303 132656128 8e Linux LVM

磁盘 /dev/mapper/centos-swap:4177 MB, 4177526784 字节,8159232 个扇区
Units = 扇区 of 1 * 512 = 512 bytes
扇区大小(逻辑/物理):512 字节 / 4096 字节
I/O 大小(最小/最佳):4096 字节 / 4096 字节


磁盘 /dev/mapper/centos-root:53.7 GB, 53687091200 字节,104857600 个扇区
Units = 扇区 of 1 * 512 = 512 bytes
扇区大小(逻辑/物理):512 字节 / 4096 字节
I/O 大小(最小/最佳):4096 字节 / 4096 字节


磁盘 /dev/mapper/centos-home:78.0 GB, 77972111360 字节,152289280 个扇区
Units = 扇区 of 1 * 512 = 512 bytes
扇区大小(逻辑/物理):512 字节 / 4096 字节
I/O 大小(最小/最佳):4096 字节 / 4096 字节

[root@slave2 ~]# fdisk /dev/sda

The device presents a logical sector size that is smaller than
the physical sector size. Aligning to a physical sector (or optimal
I/O) size boundary is recommended, or performance may be impacted.
欢迎使用 fdisk (util-linux 2.23.2)。

更改将停留在内存中,直到您决定将更改写入磁盘。
使用写入命令前请三思。


命令(输入 m 获取帮助):n(说明:新增分区)
Partition type:
p primary (2 primary, 0 extended, 2 free)
e extended
Select (default p): p(说明:主分区)
分区号 (3,4,默认 3):(说明:新增分区号(1,2默认已经使用))
起始 扇区 (266338304-1598029823,默认为 266338304):(说明:默认回车最小)
将使用默认值 266338304
Last 扇区, +扇区 or +size{K,M,G} (266338304-1598029823,默认为 1598029823):(说明:默认回车最大)
将使用默认值 1598029823
分区 3 已设置为 Linux 类型,大小设为 635 GiB

命令(输入 m 获取帮助):t(说明:修改分区类型)
分区号 (1-3,默认 3):3(说明:修改分区类型对应的分区号)
Hex 代码(输入 L 列出所有代码):8e(说明:8e是lvm磁盘类型)
已将分区“Linux”的类型更改为“Linux LVM”

命令(输入 m 获取帮助):p(说明:打印分区表)

磁盘 /dev/sda:818.2 GB, 818191269888 字节,1598029824 个扇区
Units = 扇区 of 1 * 512 = 512 bytes
扇区大小(逻辑/物理):512 字节 / 4096 字节
I/O 大小(最小/最佳):4096 字节 / 4096 字节
磁盘标签类型:dos
磁盘标识符:0x0009a639

设备 Boot Start End Blocks Id System
/dev/sda1 * 2048 1026047 512000 83 Linux
/dev/sda2 1026048 266338303 132656128 8e Linux LVM
/dev/sda3 266338304 1598029823 665845760 8e Linux LVM

命令(输入 m 获取帮助):w(说明:保存退出)
The partition table has been altered!

Calling ioctl() to re-read partition table.

WARNING: Re-reading the partition table failed with error 16: 设备或资源忙.
The kernel still uses the old table. The new table will be used at
the next reboot or after you run partprobe(8) or kpartx(8)
正在同步磁盘。
[root@slave2 ~]#
  • 重启系统
1
[root@slave2 ~]# reboot
  • 查看硬盘情况(核对刚才所做的分区操作是否保存成功)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
[root@slave2 ~]# fdisk -l

磁盘 /dev/sda:818.2 GB, 818191269888 字节,1598029824 个扇区
Units = 扇区 of 1 * 512 = 512 bytes
扇区大小(逻辑/物理):512 字节 / 4096 字节
I/O 大小(最小/最佳):4096 字节 / 4096 字节
磁盘标签类型:dos
磁盘标识符:0x0009a639

设备 Boot Start End Blocks Id System
/dev/sda1 * 2048 1026047 512000 83 Linux
/dev/sda2 1026048 266338303 132656128 8e Linux LVM
/dev/sda3 266338304 1598029823 665845760 8e Linux LVM

磁盘 /dev/mapper/centos-swap:4177 MB, 4177526784 字节,8159232 个扇区
Units = 扇区 of 1 * 512 = 512 bytes
扇区大小(逻辑/物理):512 字节 / 4096 字节
I/O 大小(最小/最佳):4096 字节 / 4096 字节


磁盘 /dev/mapper/centos-root:53.7 GB, 53687091200 字节,104857600 个扇区
Units = 扇区 of 1 * 512 = 512 bytes
扇区大小(逻辑/物理):512 字节 / 4096 字节
I/O 大小(最小/最佳):4096 字节 / 4096 字节


磁盘 /dev/mapper/centos-home:78.0 GB, 77972111360 字节,152289280 个扇区
Units = 扇区 of 1 * 512 = 512 bytes
扇区大小(逻辑/物理):512 字节 / 4096 字节
I/O 大小(最小/最佳):4096 字节 / 4096 字节
  • 查看当前分区类型
1
2
3
[root@slave2 ~]# df -T /dev/sda1
文件系统 类型 1K-块 已用 可用 已用% 挂载点
/dev/sda1 xfs 508588 154808 353780 31% /boot
  • 创建文件系统在新的磁盘上
1
2
3
4
5
6
7
8
9
10
[root@slave2 ~]# mkfs.xfs /dev/sda3
meta-data=/dev/sda3 isize=256 agcount=4, agsize=41615360 blks
= sectsz=4096 attr=2, projid32bit=1
= crc=0 finobt=0
data = bsize=4096 blocks=166461440, imaxpct=25
= sunit=0 swidth=0 blks
naming =version 2 bsize=4096 ascii-ci=0 ftype=0
log =internal log bsize=4096 blocks=81280, version=2
= sectsz=4096 sunit=1 blks, lazy-count=1
realtime =none extsz=4096 blocks=0, rtextents=0
  • 创建pv、查看pv状态(PV组成VG,VG组成LV)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
[root@slave2 ~]# pvdisplay 
--- Physical volume ---
PV Name /dev/sda2
VG Name centos
PV Size 126.51 GiB / not usable 3.00 MiB
Allocatable yes (but full)
PE Size 4.00 MiB
Total PE 32386
Free PE 0
Allocated PE 32386
PV UUID ddZZPI-6Wty-4qgS-RPMY-Gdsd-jih4-cqP0vQ

[root@slave2 ~]# pvcreate /dev/sda3
Device /dev/sda3 not found (or ignored by filtering).

[root@slave2 ~]# pvdisplay
--- Physical volume ---
PV Name /dev/sda2
VG Name centos
PV Size 126.51 GiB / not usable 3.00 MiB
Allocatable yes (but full)
PE Size 4.00 MiB
Total PE 32386
Free PE 0
Allocated PE 32386
PV UUID ddZZPI-6Wty-4qgS-RPMY-Gdsd-jih4-cqP0vQ

--- Physical volume ---
PV Name /dev/sda3
VG Name centos
PV Size 635.00 GiB / not usable 4.00 MiB
Allocatable yes
PE Size 4.00 MiB
Total PE 162559
Free PE 162559
Allocated PE 0
PV UUID OiezFh-mTSY-u0Xk-pADo-rJsc-dfT5-kfCUCF
  • 刚创建的PV加入相应的VG
1
2
3
4
5
[root@slave2 ~]# vgextend centos /dev/sda3
WARNING: xfs signature detected on /dev/sda3 at offset 0. Wipe it? [y/n]: y
Wiping xfs signature on /dev/sda3.
Physical volume "/dev/sda3" successfully created
Volume group "centos" successfully extended
  • 查看LV状态,把VG加入到LV
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
[root@slave2 ~]# lvdisplay 
--- Logical volume ---
LV Path /dev/centos/swap
LV Name swap
VG Name centos
LV UUID x2Acgw-2v1S-HrpP-rJCW-b259-RivD-qF6fdC
LV Write Access read/write
LV Creation host, time bogon, 2016-11-23 10:32:37 +0800
LV Status available
# open 2
LV Size 3.89 GiB
Current LE 996
Segments 1
Allocation inherit
Read ahead sectors auto
- currently set to 8192
Block device 253:0

--- Logical volume ---
LV Path /dev/centos/home
LV Name home
VG Name centos
LV UUID lbxRvi-O3Th-EAdc-2dKL-Pf13-YvQn-Om88WI
LV Write Access read/write
LV Creation host, time bogon, 2016-11-23 10:32:37 +0800
LV Status available
# open 1
LV Size 72.62 GiB
Current LE 18590
Segments 1
Allocation inherit
Read ahead sectors auto
- currently set to 8192
Block device 253:2

--- Logical volume ---
LV Path /dev/centos/root
LV Name root
VG Name centos
LV UUID oqgktg-XpOf-2HXX-rrLM-pGW0-ebs0-e2FR8j
LV Write Access read/write
LV Creation host, time bogon, 2016-11-23 10:32:39 +0800
LV Status available
# open 1
LV Size 50.00 GiB
Current LE 12800
Segments 1
Allocation inherit
Read ahead sectors auto
- currently set to 8192
Block device 253:1

[root@slave2 ~]# lvextend -l +162559 /dev/mapper/centos-home
Size of logical volume centos/home changed from 72.62 GiB (18590 extents) to 707.61 GiB (181149 extents).
Logical volume home successfully resized.
  • 调整文件系统大小
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@slave2 ~]# xfs_growfs /dev/mapper/centos-home 
meta-data=/dev/mapper/centos-home isize=256 agcount=4, agsize=4759040 blks
= sectsz=512 attr=2, projid32bit=1
= crc=0 finobt=0
data = bsize=4096 blocks=19036160, imaxpct=25
= sunit=0 swidth=0 blks
naming =version 2 bsize=4096 ascii-ci=0 ftype=0
log =internal bsize=4096 blocks=9295, version=2
= sectsz=512 sunit=0 blks, lazy-count=1
realtime =none extsz=4096 blocks=0, rtextents=0
data blocks changed from 19036160 to 185496576
[root@slave2 ~]# df -h
文件系统 容量 已用 可用 已用% 挂载点
/dev/mapper/centos-root 50G 5.2G 45G 11% /
devtmpfs 1.9G 0 1.9G 0% /dev
tmpfs 1.9G 0 1.9G 0% /dev/shm
tmpfs 1.9G 8.3M 1.9G 1% /run
tmpfs 1.9G 0 1.9G 0% /sys/fs/cgroup
/dev/mapper/centos-home 708G 34M 708G 1% /home
/dev/sda1 497M 152M 346M 31% /boot
tmpfs 379M 0 379M 0% /run/user/0

Hadoop Yarn

Yarn 简介

YARN的基本思想是将 JobTracker 的两个主要功能(资源管理和作业调度/监控)分离,主要方法是创建一个全局的ResourceManager(RM)和若干个针对应用程序的ApplicationMaster(AM)。这里的应用程序是指传统的MapReduce作业或作业的DAG(有向无环图)。

YARN 分层结构的本质是 ResourceManager。这个实体控制整个集群并管理应用程序向基础计算资源的分配。ResourceManager 将各个资源部分(计算、内存、带宽等)精心安排给基础 NodeManager(YARN 的每节点代理)。ResourceManager 还与 ApplicationMaster 一起分配资源,与 NodeManager 一起启动和监视它们的基础应用程序。在此上下文中,ApplicationMaster 承担了以前的 TaskTracker 的一些角色,ResourceManager 承担了 JobTracker 的角色。
ApplicationMaster 管理一个在 YARN 内运行的应用程序的每个实例。ApplicationMaster 负责协调来自 ResourceManager 的资源,并通过 NodeManager 监视容器的执行和资源使用(CPU、内存等的资源分配)。请注意,尽管目前的资源更加传统(CPU 核心、内存),但未来会带来基于手头任务的新资源类型(比如图形处理单元或专用处理设备)。从 YARN 角度讲,ApplicationMaster 是用户代码,因此存在潜在的安全问题。YARN 假设 ApplicationMaster 存在错误或者甚至是恶意的,因此将它们当作无特权的代码对待。
NodeManager 管理一个 YARN 集群中的每个节点。NodeManager 提供针对集群中每个节点的服务,从监督对一个容器的终生管理到监视资源和跟踪节点健康。MRv1 通过插槽管理 Map 和 Reduce 任务的执行,而 NodeManager 管理抽象容器,这些容器代表着可供一个特定应用程序使用的针对每个节点的资源。YARN 继续使用 HDFS 层。它的主要 NameNode 用于元数据服务,而 DataNode 用于分散在一个集群中的复制存储服务。
要使用一个 YARN 集群,首先需要来自包含一个应用程序的客户的请求。ResourceManager 协商一个容器的必要资源,启动一个 ApplicationMaster 来表示已提交的应用程序。通过使用一个资源请求协议,ApplicationMaster 协商每个节点上供应用程序使用的资源容器。执行应用程序时,ApplicationMaster 监视容器直到完成。当应用程序完成时,ApplicationMaster 从 ResourceManager 注销其容器,执行周期就完成了。

MRv1 的缺陷

MapReduce 的第一个版本既有优点也有缺点。MRv1 是目前使用的标准的大数据处理系统。但是,这种架构存在不足,主要表现在大型集群上。当集群包含的节点超过 4,000 个时(其中每个节点可能是多核的),就会表现出一定的不可预测性。其中一个最大的问题是级联故障,由于要尝试复制数据和重载活动的节点,所以一个故障会通过网络泛洪形式导致整个集群严重恶化。
但 MRv1 的最大问题是多租户。随着集群规模的增加,一种可取的方式是为这些集群采用各种不同的模型。MRv1 的节点专用于 Hadoop,所以可以改变它们的用途以用于其他应用程序和工作负载。当大数据和 Hadoop 成为云部署中一个更重要的使用模型时,这种能力也会增强,因为它允许在服务器上对 Hadoop 进行物理化,而无需虚拟化且不会增加管理、计算和输入/输出开销。

Yarn的优点

大大减小了 JobTracker(也就是现在的 ResourceManager)的资源消耗,并且让监测每一个 Job 子任务 (tasks) 状态的程序分布式化了,更安全、更优美。
在新的 Yarn 中,ApplicationMaster 是一个可变更的部分,用户可以对不同的编程模型写自己的 AppMst,让更多类型的编程模型能够跑在 Hadoop 集群中,可以参考 hadoop Yarn 官方配置模板中的 mapred-site.xml 配置。
对于资源的表示以内存为单位 ( 在目前版本的 Yarn 中,没有考虑 cpu 的占用 ),比之前以剩余 slot 数目更合理。
老的框架中,JobTracker 一个很大的负担就是监控 job 下的 tasks 的运行状况,现在,这个部分就扔给 ApplicationMaster 做了,而 ResourceManager 中有一个模块叫做 ApplicationsMasters( 注意不是 ApplicationMaster),它是监测 ApplicationMaster 的运行状况,如果出问题,会将其在其他机器上重启。
Container 是 Yarn 为了将来作资源隔离而提出的一个框架。这一点应该借鉴了 Mesos 的工作,目前是一个框架,仅仅提供 java 虚拟机内存的隔离,hadoop 团队的设计思路应该后续能支持更多的资源调度和控制 , 既然资源表示成内存量,那就没有了之前的 map slot/reduce slot 分开造成集群资源闲置的尴尬情况。

YARN的核心思想

将JobTracker和TaskTacker进行分离,它由下面几大构成组件:
a. 一个全局的资源管理器 ResourceManager
b. ResourceManager的每个节点代理 NodeManager
c. 表示每个应用的 ApplicationMaster
d. 每一个ApplicationMaster拥有多个Container在NodeManager上运行[2]

YARN的主要架构

ResourceManager(RM)

RM是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。
调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。需要注意的是,该调度器是一个“纯调度器”,它不再从事任何与具体应用程序相关的工作,比如不负责监控或者跟踪应用的执行状态等,也不负责重新启动因应用执行失败或者硬件故障而产生的失败任务,这些均交由应用程序相关的ApplicationMaster完成。调度器仅根据各个应用程序的资源需求进行资源分配,而资源分配单位用一个抽象概念“资源容器”(Resource Container,简称Container)表示,Container是一个动态资源分配单位,它将内存、CPU、磁盘、网络等资源封装在一起,从而限定每个任务使用的资源量。此外,该调度器是一个可插拔的组件,用户可根据自己的需要设计新的调度器,YARN提供了多种直接可用的调度器,比如Fair Scheduler和Capacity Scheduler等。
应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。

ApplicationMaster(AM)

用户提交的每个应用程序均包含一个AM,主要功能包括:
与RM调度器协商以获取资源(用Container表示);
将得到的任务进一步分配给内部的任务(资源的二次分配);
与NM通信以启动/停止任务;
监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。
当前YARN自带了两个AM实现,一个是用于演示AM编写方法的实例程序distributedshell,它可以申请一定数目的Container以并行运行一个Shell命令或者Shell脚本;另一个是运行MapReduce应用程序的AM—MRAppMaster。
注:RM只负责监控AM,在AM运行失败时候启动它,RM并不负责AM内部任务的容错,这由AM来完成。

NodeManager(NM)

NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它接收并处理来自AM的Container启动/停止等各种请求

Container

Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。
注:1. Container不同于MRv1中的slot,它是一个动态资源划分单位,是根据应用程序的需求动态生成的。

  1. 现在YARN仅支持CPU和内存两种资源,且使用了轻量级资源隔离机制Cgroups进行资源隔离。
    YARN的资源管理和执行框架都是按主/从范例实现的——Slave —节点管理器(NM)运行、监控每个节点,并向集群的Master—资源管理器(RM)报告资源的可用性状态,资源管理器最终为系统里所有应用分配资源。
    特定应用的执行由ApplicationMaster控制,ApplicationMaster负责将一个应用分割成多个任务,并和资源管理器协调执行所需的资源,资源一旦分配好,ApplicationMaster就和节点管理器一起安排、执行、监控独立的应用任务。
    需要说明的是, YARN不同服务组件的通信方式采用了事件驱动的异步并发机制,这样可以简化系统的设计。

YARN架构简析

集中式架构

集中式调度器(Monolithic Scheduler)的特点是,资源的调度和应用程序的管理功能全部放到一个进程中完成,开源界典型的代表是MRv1 JobTracker的实现。这样设计的缺点很明显,扩展性差:首先,集群规模受限;其次,新的调度策略难以融入到现有代码中,比如之前仅支持MapReduce作业,现在要支持流式作业,而将流式作业的调度策略嵌入到中央调度其中是一项很难的工作。

双层调度架构

为了克服集中式调度器的不足,双层调度器是一种很容易被想到的解决之道,它可看作是一种分而治之的机制或者是策略下放机制:双层调度器仍保留一个经简化的集中式资源调度器,但具体任务相关的调度策略则下放到各个应用程序调度器完成。这种调度器的典型代表是Mesos。Mesos调度器由两部分组成,分别是资源调度器和框架(应用程序)调度器,其中,资源调度器负责将集群中的资源分配给各个框架(应用程序),而框架(应用程序)调度器负责将资源进一步分配给内部的各个任务,用户很容易将一种框架或者系统接入Mesos.
双层调度器的特点是:各个框架调度器并不知道整个集群资源使用情况,只是被动地接受资源;资源调度器仅将可用的资源推送给各个框架,而由框架自己选择是使用还是拒绝这些资源;一旦框架接受到新资源,再进一步将资源分配给其内部的任务,进而实现双层调度。然而这种调度器也是有缺点,主要表现在以下两个方面:1.各个框架无法知道整个集群的实时资源使用情况;采用悲观锁,并发粒度小。

Hadoop 新 MapReduce 框架 Yarn 详解

Hadoop MapReduceV2(Yarn) 框架简介

原 Hadoop MapReduce 框架的问题

图1:Hadoop 原 MapReduce 架构
Hadoop 原 MapReduce 架构
从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路:

  1. 首先用户程序(JobClient)提交了一个job,job的信息会发送到JobTracker,JobTracker是Map-reduce框架的中心,它需要与集群中的机器定时通信(heartbeat),需要管理哪些程序应该跑在哪些机器上,需要管理所有job失败、重启等操作。
  2. TaskTracker 是 Map-reduce 集群中每台机器都有的一个部分,它做的事情是监视自己所在机器的资源情况。
  3. TaskTracker 同时监视当前机器的tasks运行情况,TaskTracker需要把这些信息通过heartbeat发送给JobTracker,JobTracker会收集这些信息以给新提交的job分配运行在哪些机器上。上图虚线就是标识消息的发送-接收的过程。
    可以看得出原来的 map-reduce 架构是简单明了的,在最初推出的几年,也得到了众多的成功案例,获得业界广泛的支持和肯定,但随着分布式系统集群的规模和其工作负荷的增长,原框架的问题逐渐浮出水面,主要的问题集中如下:
  4. JobTracker 是 Map-reduce 的集中处理点,存在单点故障。
    JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker fail 的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限。
  5. 在 TaskTracker 端,以 map/reduce task 的数目作为资源的表示过于简单,没有考虑到 cpu/ 内存的占用情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM。
  6. 在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot, 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费,也就是前面提过的集群资源利用的问题。4. 源代码层面分析的时候,会发现代码非常的难读,常常因为一个 class 做了太多的事情,代码量达 3000 多行,造成 class 的任务不清晰,增加 bug 修复和版本维护的难度。
  7. 从操作的角度来看,现在的 Hadoop MapReduce 框架在有任何重要的或者不重要的变化 ( 例如 bug 修复,性能提升和特性化 ) 时,都会强制进行系统级别的升级更新。更糟的是,它不管用户的喜好,强制让分布式集群系统的每一个用户端同时更新。这些更新会让用户为了验证他们之前的应用程序是不是适用新的 Hadoop 版本而浪费大量时间。

新 Hadoop Yarn 框架原理及运作机制

从业界使用分布式系统的变化趋势和 hadoop 框架的长远发展来看,MapReduce 的 JobTracker/TaskTracker 机制需要大规模的调整来修复它在可扩展性,内存消耗,线程模型,可靠性和性能上的缺陷。在过去的几年中,hadoop 开发团队做了一些 bug 的修复,但是最近这些修复的成本越来越高,这表明对原框架做出改变的难度越来越大。

为从根本上解决旧 MapReduce 框架的性能瓶颈,促进 Hadoop 框架的更长远发展,从 0.23.0 版本开始,Hadoop 的 MapReduce 框架完全重构,发生了根本的变化。新的 Hadoop MapReduce 框架命名为 MapReduceV2 或者叫 Yarn,其架构图如下图所示:

图2:新的 Hadoop MapReduce 框架(Yarn)架构
新的 Hadoop MapReduce 框架(Yarn)架构

重构根本的思想是将 JobTracker 两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度 / 监控。新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协调。一个应用程序无非是一个单独的传统的 MapReduce 任务或者是一个 DAG( 有向无环图 ) 任务。ResourceManager 和每一台机器的节点管理服务器能够管理用户在那台机器上的进程并能对计算进行组织。

事实上,每一个应用的 ApplicationMaster 是一个详细的框架库,它结合从 ResourceManager 获得的资源和 NodeManager 协同工作来运行和监控任务。

上图中 ResourceManager 支持分层级的应用队列,这些队列享有集群一定比例的资源。从某种意义上讲它就是一个纯粹的调度器,它在执行过程中不对应用进行监控和状态跟踪。同样,它也不能重启因应用失败或者硬件错误而运行失败的任务。

ResourceManager 是基于应用程序对资源的需求进行调度的 ; 每一个应用程序需要不同类型的资源因此就需要不同的容器。资源包括:内存,CPU,磁盘,网络等等。可以看出,这同现 Mapreduce 固定类型的资源使用模型有显著区别,它给集群的使用带来负面的影响。资源管理器提供一个调度策略的插件,它负责将集群资源分配给多个队列和应用程序。调度插件可以基于现有的能力调度和公平调度模型。

上图中 NodeManager 是每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源使用情况 (CPU,内存,硬盘,网络 ) 并且向调度器汇报。

每一个应用的 ApplicationMaster 的职责有:向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。

新旧 Hadoop MapReduce 框架比对

让我们来对新旧 MapReduce 框架做详细的分析和对比,可以看到有以下几点显著变化:

首先客户端不变,其调用 API 及接口大部分保持兼容,这也是为了对开发使用者透明化,使其不必对原有代码做大的改变,但是原框架中核心的 JobTracker 和 TaskTracker 不见了,取而代之的是 ResourceManager, ApplicationMaster 与 NodeManager 三个部分。

我们来详细解释这三个部分,首先 ResourceManager 是一个中心的服务,它做的事情是调度、启动每一个 Job 所属的 ApplicationMaster、另外监控 ApplicationMaster 的存在情况。细心的读者会发现:Job 里面所在的 task 的监控、重启等等内容不见了。这就是 AppMst 存在的原因。ResourceManager 负责作业与资源的调度。接收 JobSubmitter 提交的作业,按照作业的上下文 (Context) 信息,以及从 NodeManager 收集来的状态信息,启动调度过程,分配一个 Container 作为 App Mstr

NodeManager 功能比较专一,就是负责 Container 状态的维护,并向 RM 保持心跳。

ApplicationMaster 负责一个 Job 生命周期内的所有工作,类似老的框架中 JobTracker。但注意每一个 Job(不是每一种)都有一个 ApplicationMaster,它可以运行在 ResourceManager 以外的机器上。

Yarn 框架相对于老的 MapReduce 框架什么优势呢?我们可以看到:

这个设计大大减小了 JobTracker(也就是现在的 ResourceManager)的资源消耗,并且让监测每一个 Job 子任务 (tasks) 状态的程序分布式化了,更安全、更优美。
在新的 Yarn 中,ApplicationMaster 是一个可变更的部分,用户可以对不同的编程模型写自己的 AppMst,让更多类型的编程模型能够跑在 Hadoop 集群中,可以参考 hadoop Yarn 官方配置模板中的 mapred-site.xml 配置。
对于资源的表示以内存为单位 ( 在目前版本的 Yarn 中,没有考虑 cpu 的占用 ),比之前以剩余 slot 数目更合理。
老的框架中,JobTracker 一个很大的负担就是监控 job 下的 tasks 的运行状况,现在,这个部分就扔给 ApplicationMaster 做了,而 ResourceManager 中有一个模块叫做 ApplicationsMasters( 注意不是 ApplicationMaster),它是监测 ApplicationMaster 的运行状况,如果出问题,会将其在其他机器上重启。
Container 是 Yarn 为了将来作资源隔离而提出的一个框架。这一点应该借鉴了 Mesos 的工作,目前是一个框架,仅仅提供 java 虚拟机内存的隔离 ,hadoop 团队的设计思路应该后续能支持更多的资源调度和控制 , 既然资源表示成内存量,那就没有了之前的 map slot/reduce slot 分开造成集群资源闲置的尴尬情况。

新的 Yarn 框架相对旧 MapRduce 框架而言,其配置文件 , 启停脚本及全局变量等也发生了一些变化,主要的改变如下:

表1:新旧 Hadoop 脚本 / 变量 / 位置变化表

改变项 原框架中 新框架中(Yarm) 备注
配置文件位置 ${hadoop_home_dir}/conf ${hadoop_home_dir}/etc/hadoop Yarn 框架也兼容老的 ${hadoop_home_dir}/conf 位置配置,启动时会检测是否存在老的 conf 目录,如果存在将加载 conf 目录下的配置,否则加载 etc 下配置
启停脚本 ${hadoop_home_dir}/bin/start(stop)-all.sh ${hadoop_home_dir}/sbin/start(stop)-dfs.sh ${hadoop_home_dir}/bin/start(stop)-all.sh 新的 Yarn 框架中启动分布式文件系统和启动 Yarn 分离,启动 / 停止分布式文件系统的命令位于 ${hadoop_home_dir}/sbin 目录下,启动 / 停止 Yarn 框架位于 ${hadoop_home_dir}/bin/ 目录下
JAVA_HOME 全局变量 ${hadoop_home_dir}/bin/start-all.sh 中 ${hadoop_home_dir}/etc/hadoop/hadoop-env.sh ${hadoop_home_dir}/etc/hadoop/Yarn-env.sh Yarn 框架中由于启动 hdfs 分布式文件系统和启动 MapReduce 框架分离,JAVA_HOME 需要在 hadoop-env.sh 和 Yarn-env.sh 中分别配置
HADOOP_LOG_DIR 全局变量 不需要配置 ${hadoop_home_dir}/etc/hadoop/hadoop-env.sh 老框架在 LOG,conf,tmp 目录等均默认为脚本启动的当前目录下的 log,conf,tmp 子目录,Yarn 新框架中 Log 默认创建在 Hadoop 用户的 home 目录下的 log 子目录,因此最好在 ${hadoop_home_dir}/etc/hadoop/hadoop-env.sh 配置 HADOOP_LOG_DIR,否则有可能会因为你启动 hadoop 的用户的 .bashrc 或者 .bash_profile 中指定了其他的 PATH 变量而造成日志位置混乱,而该位置没有访问权限的话启动过程中会报错

由于新的 Yarn 框架与原 Hadoop MapReduce 框架相比变化较大,核心的配置文件中很多项在新框架中已经废弃,而新框架中新增了很多其他配置项,看下表所示会更加清晰:

表2:新旧 Hadoop 框架配置项变化表

配置文件 配置项 Hadoop 0.20.X配置 Hadoop 0.23.X配置 说明
core-site.xml 系统默认分布式文件URL fs.default.name fs.defaultFS
hdfs-site.xml DFS name node 存放 name table 的目录 dfs.name.dir dfs.namenode.name.dir 新框架中 name node 分成 dfs.namenode.name.dir( 存放 naname table 和 dfs.namenode.edits.dir(存放 edit 文件),默认是同一个目录
hdfs-site.xml DFS data node 存放数据block的目录 dfs.data.dir dfs.datanode.data.dir 新框架中 DataNode 增加更多细节配置,位于 dfs.datanode. 配置项下,如 dfs.datanode.data.dir.perm(datanode local 目录默认权限);dfs.datanode.address(datanode 节点监听端口);等
hdfs-site.xml 分布式文件系统数据复制块 dfs.replication dfs.replication 新框架与老框架一致,值建议配置为与分布式 cluster 中实际的 DataNode 主机数一致
mapred-site.xml Job监控地址及端口 mapred.job.tracker 新框架中已改为 Yarn-site.xml 中的 resouceManager 及 nodeManager 具体配置项,新框架中历史 job 的查询已从 Job tracker 剥离,归入单独的 mapreduce.jobtracker.jobhistory 相关配置,
mapred-site.xml 第三方MapReduce框架 mapreduce.framework.name 新框架支持第三方 MapReduce 开发框架以支持如 SmartTalk/DGSG 等非 Yarn 架构,注意通常情况下这个配置的值都设置为 Yarn,如果没有配置这项,那么提交的 Yarn job 只会运行在 locale 模式,而不是分布式模式。
yarn-site.xml The address of the applications manager interface in the RM Yarn.resourcemanager.address 新框架中 NodeManager 与 RM 通信的接口地址
yarn-site.xml The address of the scheduler interface Yarn.resourcemanager.scheduler.address 同上,NodeManger 需要知道 RM 主机的 scheduler 调度服务接口地址
yarn-site.xml The address of the RM web application Yarn.resourcemanager.webapp.address 新框架中各个 task 的资源调度及运行状况通过通过该 web 界面访问
yarn-site.xml The address of the resource tracker interface Yarn.resourcemanager.resource-tracker.address 新框架中 NodeManager 需要向 RM 报告任务运行状态供 Resouce 跟踪,因此 NodeManager 节点主机需要知道 RM 主机的 tracker 接口地址

JobTracker 和 TaskTracker 概述

概述

Hadoop MapReduce 采用 Master/Slave 结构。

  • Master:整个集群的全局管理者,功能包括:作业管理、状态监控和任务调度等,即 MapReduce 中的 JobTracker。
  • Slave:负责任务的执行和任务状态的回报,即 MapReduce 中的 TaskTracker。

JobTracker 剖析

  1. 概述:JobTracker 是一个后台服务进程,启动之后,会一直监听并接受来自各个 TaskTracker 发送的心跳信息,包括资源使用情况和任务运行情况等信息。
  2. 功能:
  • 作业控制:在 hadoop 中每个应用程序被表示成一个作业,每个作业又被分成多个任务,JobTracker 的作业控制模块则负责作业的分解和状态监控。最重要的是状态监控:主要包括TaskTracker状态监控、作业状态监控和任务状态监控。主要作用:容错和为任务调度提供决策依据。
  • 资源管理

TaskTracker 剖析

  1. 概述:TaskTracker 是 JobTracker 和 Task 之间的桥梁,一方面,从 JobTracker 接受并执行各种命令:运行任务、提交任务、杀死任务等;另一方面,将本地节点上各个任务的状态通过心跳周期性汇报给 JobTracker。TaskTracker与JobTracker和Task之间采用RPC协议进行通信。
  2. 功能:
  • 汇报心跳:Tracker周期性将所有节点上各种信息通过心跳机制汇报给JobTracker。这些信息包括两部分:
    a. 机器级别信息:节点健康情况、资源使用情况等
    b. 任务级别信息:任务执行进度、任务运行状态等
  • 执行命令:JobTracker 会给 TaskTracker 下达各种命令,主要包括:启动任务、提交任务、杀死任务、杀死作业和重新初始化。

浅谈 JobTracker 和 TaskTracker

JobTracker 对应于 NameNode。TaskTracker 对应于 DataNode。
DataNode 和 NameNode 是针对数据存放而言的,JobTracker 和 TaskTracker 是对于 MapReduce 执行而言的。

MapReduce 中有几个主要概念:JobClient、JobTracker和TaskTracker。MapReduce 整体上可以分为这么几条执行线索:

  1. JobClient 会在用户端通过 JobClient 类将应用已经配置参数打包成 jar 文件存储到 HDFS,并把路径提交到 JobTracker,然后由 JobTracker 创建每一个 Task(即MapTask和ReduceTask),并将他们分发到各个 TaskTracker 服务中去执行
  2. JobTracker 是一个 master 服务,软件启动之后 JobTracker 接受 Job,负责调度 Job 的每一个子任务 task 运行于 TaskTracker 上,并监控他们,如果发现有失败的 task 就重新运行它。一般情况下应该把 JobTracker 部署在单独的机器上。
  3. TaskTracker 是运行在多个节点上的 slaver 服务,TaskTracker 主动与 JobTracker 通信,接受作业,并负责执行每一个任务。TaskTracker 都需要运行在 HDFS 的 DataNode 上。

Hadoop MapReduce

MapReduce 是一种编程模型,用于大规模数据级(大于 1TB)的并行运算。概念“Map(映射)”和“Reduce(归约)”是它们的主要思想。谷歌在 2004 年发表了可以分析大量数据的 MapReduce 算法。每当你听到“大数据”这个词时,它指的是因为太大而让仅仅一台机器难以有效存储或分析的问题。MapReduce 通过把计算量分配给不同的计算机群,能够解决大部分和大数据有关的分析问题。Hadoop 提供了最受欢迎的利用 MapReduce 算法来管理大数据的开源方式。

所以通常来说,每当听到“大数据”,那也许意味着 Hadoop 被用来存储数据,也通常意味着数据的抽取和检索是用的 MapReduce。

拆分 MapReduce 算法

MapReduce 合并了两种经典函数:

映射(Mapping):对集合里的每个目标应用同一个操作。即,如果你相把表单里的每个单元格乘以二,那么把这个函数单独的应用到每个单元格上的操作就属于 mapping。
化简(Reducing):遍历结合中的元素来返回一个综合的结果。即,“输出表单里的一列数字的和”这个任务属于reducing。

MapReduce 算法例子

你想数一摞牌中有多少张黑桃,直观的方式是一张一张检查并且输出有多少张黑桃。MapReduce 方法则是:

  1. 给在座的所有玩家分配这摞牌
  2. 让每个玩家数自己手中的牌有几张是黑桃,然后把这个数目汇报给你
  3. 你把所有玩家告诉你的数字加起来,得到最后的结果

这不是一个严谨的例子,在这个例子中,人代表计算机,因为他们同时工作,所以他们是一个集群。在大多数实际应用中,我们假设数据已经在每台计算机上了-也就是说把牌分发并不是 MapReduce 的第一步。(事实上,在计算机集群中如何存储文件是 Hadoop 的真正核心。)
通过把牌分给多个玩家并且让他们各自数数,你就在并行执行运算,因为每个玩家都在同时计算。这同时把这项工作变成了分布式,因为多个不同的人在解决同一个问题的过程中并不需要知道他们的邻居在干什么。
通过告诉每个人数数,你对一项检查每张牌的任务进行了映射。你不会让他们把黑桃牌递给你,而是让他们把你想要的东西化简为一个数字。
如果有足够的人的话,问一些更有趣的问题就相当简单了-比如“一摞牌的平均值是什么”。你可以通过合并“所有牌的值的和是什么”及“我们有多少张牌”这两个问题来得到答案。用这个和除以牌的张数就得到了平均值。

主要功能

MapReduce 通过把对数据集的大规模操作分发给网络上的每个节点实现可靠性;每个节点会周期性的返回它所完成的工作和最新的状态。如果一个节点保持沉默超过一个预设的时间间隔,主节点记录下这个节点状态为死亡,并把分配给这个节点的数据发到别的节点。

MapReduce 提供了以下的主要功能:

  1. 数据划分和任务调度:

系统自动将一个作业(Job)待处理的大数据划分为很多数据块,每个数据块对应于一个计算任务(Task),并自动调去计算节点来处理相应的数据块。作业和任务调度功能主要负责分配和调度计算节点(Map节点或Reduce节点),同时负责监控这些节点的执行状态,并负责Map节点指定的同步控制。

  1. 数据/代码互定位:

为了减少数据通信,一个节本原则是本地化数据处理,即一个计算节点尽可能处理其本地磁盘上所分布存储的数据,这实现了代码向数据的迁移;当无法进行这种本地化处理数据处理时,再寻找其它可用节点并将数据从网络上送到该节点(数据向代码迁移),但尽可能从数据所在的本地机架上寻找可用节点以减少通信延迟。

  1. 系统优化

为了减少数据通信开销,中间结果数据进入 Reduce 节点前会进行一定的合并处理;一个 Reduce 节点所处理的数据可能会来自多个 Map 节点,为了避免 Reduce 计算阶段发生数据相关性,Map 节点输出的中间结果需要使用一定的策略进行适当的划分处理,保证相关性数据发送到同一个 Reduce 节点;此外,系统还进行一些计算性能优化处理,如对最慢的计算任务采用多备份执行、选最快完成者作为结果。

  1. 出错检测和恢复

以低端商用服务器构成的大规模MapReduce 计算集群中,节点硬件(主机、磁盘、内存等)出错和软件出错是常态,因此 MapReduce 需要能检测并隔离出错点,并调度分配新的节点接管出错节点的计算任务。同时,系统还将维护数据存储的可靠性,用多备份冗余存储机制提高数据存储的可靠性,并能及时检测和恢复出错的数据。

Hadoop 启动、关闭

适用版本

1
2
3
4
5
6
7
[root@master hadoop]# hadoop version
Hadoop 3.0.0-alpha1
Source code repository https://git-wip-us.apache.org/repos/asf/hadoop.git -r a990d2ebcd6de5d7dc2d3684930759b0f0ea4dc3
Compiled by andrew on 2016-08-30T07:02Z
Compiled with protoc 2.5.0
From source with checksum f3a9644139eac17acbb91bfce7f68e2
This command was run using /usr/local/hadoop/share/hadoop/common/hadoop-common-3.0.0-alpha1.jar

参考地址

http://hadoop.apache.org/docs/r3.0.0-alpha1/hadoop-project-dist/hadoop-common/ClusterSetup.html#Slaves_File

Hadoop Startup

  1. 要启动 Hadooop cluster,需要同时启动 HDFS 和 YARN cluster.
    第一次打开 HDFS,它必须先格式化它。
1
$HADOOP_HOME/bin/hdfs namenode -format
  1. 如果 etc/hadoop/workers 与 ssh 信任已经配置好了,所有的 HDFS processes 可以使用单一的脚本来启动
1
$HADOOP_HOME/sbin/start-dfs.sh   #在NameNode 节点运行

也可以逐个节点来启动:

1
2
$HADOOP_HOME/bin/hdfs --daemon start namenode   #Start HDFS NameNode
$HADOOP_HOME/bin/hdfs --daemon start datanode #Start HDFS DataNode
  1. 如果 etc/hadoop/workers 与 SSH 信任已经配置好了,所有的 YARN processes 可以使用单一的脚本来启动

YARN 是从 MapReduce 中分离出来的,负责资源管理与任务调度。运行于 MapReduce 之上,提供了高可用性、高扩展性。

1
$HADOOP_HOME/sbin/start-yarn.sh

也可以逐个节点来启动:

1
2
3
$HADOOP_HOME/bin/yarn --daemon start resourcemanager    #使用如下命令运行指定的资源管理程序
$HADOOP_HOME/bin/yarn --daemon start nodemanager #在每个yarn节点启动NodeManager
$HADOOP_HOME/bin/yarn --daemon start proxyserver
  1. 在指定的服务器运行如下命令启动 MapReduce JobHistory Server

开启历史服务器,才能在 Web 中查看任务运行(注意,这里是查看,默认是全部会记录,只有开启后才能查看)。

1
$HADOOP_HOME/bin/mapred --daemon start historyserver

Hadoop ShutDown

  1. 如果 etc/hadoop/workers 与 SSH 信任已经配置好了,使用如下命令停止所有的 HDFS processes
1
$HADOOP_HOME/sbin/stop-dfs.sh    #在 NameNode 节点运行

同启动的时候一样,也可以逐个节点停止:

1
2
$HADOOP_HOME/bin/hdfs --daemon stop namenode    # 在 NameNode 节点运行
$HADOOP_HOME/bin/hdfs --daemon stop datanode # 在 DataNode 节点运行
  1. 如果 etc/hadoop/workers 与 SSH 信任已经配置好可,使用如下命令停止所有 YARN processes
1
$HADOOP_HOME/sbin/stop-yarn.sh

也可以逐个节点停止:

1
2
3
$HADOOP_HOME/bin/yarn --daemon stop resourcemanager
$HADOOP_HOME/bin/yarn --daemon stop nodemanager
$HADOOP_HOME/bin/yarn --daemon stop proxyserver
  1. 停止 MapReduce JobHistory Server
1
$HADOOP_HOME/bin/mapred --daemon stop historyserver

SSH 无密码登录节点

在配置 Hadoop 的过程中一个必要的步骤是配置 Mster 节点可以无密码 SSH 登录到各个 Slave 节点上。

如下说明配置步骤:

  1. 生成 Master 节点的公匙(在 Master 节点的终端中执行)
1
2
3
cd ~/.ssh		#如果没有该目录,先执行一次 ssh localhost
rm -rf ./id_rsa* #删除之前生成的公钥(如果有)
ssh-keygen -t rsa #一直按回车就可以
  1. 配置 Master 节点可以无密码 SSH 本机(在 Master 节点的终端中执行)
1
cat ./id_rsa.pub >>./authorized_keys

完成后可执行 ssh Master 验证一下(可能需要输入 yes,成功后执行 exit 返回原来的终端)。

  1. 在 Master 节点将公钥传送到各个 Slave 节点
1
scp ~/.ssh/id_rsa.pub root@slave1:/root/
  1. 在 Slave 节点上,将 Master SSH 公匙加入授权
1
2
3
mkdir ~/.ssh		#如果不存在该文件夹需要先创建,若存在则忽略
cat ~/id_rsa.pub >>~/.ssh/authorized_keys
rm ~/id_rsa.pub #用完就可以删除了

如果有其它 slave 节点,也要执行将 Master 公匙传送到其它 Slave 节点并授权的步骤。


至此,在 Master 节点上就可以无密码 SSH 到各个 Slave 节点了。可以在 Master 节点上执行如下命令进行检验:

1
ssh slave1