Hadoop集群(六)—— Hive 优化及高可用

caroly 2020年08月27日 64次浏览

核心思想:把 Hive SQL 当作 MapReduce 程序去优化。

以下 SQL 不会转为 MapReduce 来执行:

  • select 仅查询本表字段。
  • where仅对本表字段做条件过滤。

Hive 优化

# 对简单的 不需要聚合的 类似 select <col> from <table> LIMIT n 语句,不需要起 MR Job,直接通过 Fetch task 获取数据
set hive.fetch.task.conversion=more;
# 开启本地模式
set hive.exec.mode.local.auto=true;
# 表示加载文件的最大值,默认为 128M,若大于该配置仍会以集群方式来运行
hive.exec.mode.local.auto.inputbytes.max
# 开启并行模式
set hive.exec.parallel=true;
# 一次 SQL 计算中允许并行执行的 job 个数的最大值,默认值为 8
hive.exec.parallel.thread.number
# 开启严格模式,默认为 nonstrict 非严格模式
set hive.mapred.mode=strict;

Hive 排序

  • Order By:对于查询结果做全排序,只允许有一个 reduce 处理。(当数据量较大时,应慎用。严格模式下,必须结合 limit 来使用)
  • Sort By:对于单个 reduce 的数据进行排序。
  • Distribute By:分区排序,经常和 Sort By 结合使用。
  • Cluster By:相当于 Sort By + Distribute By。(Cluster By 不能通过 asc、desc的方式指定排序规则;可通过 distribute by column sort by column asc | desc 的方式)

Hive Join

启用自动的 Map join:

# 该参数为true时,Hive自动对左边的表统计量,如果是小表就加入内存,即对小表使用 Map join
set hive.auto.convert.join = true;
# 大表小表判断的阈值,如果表的大小小于该值则会被加载到内存中运行,默认为 25M
hive.mapjoin.smalltable.filesize;  
# 默认值:true;是否忽略 mapjoin hint 即 mapjoin 标记
hive.ignore.mapjoin.hint;
# 默认值:true;将普通的 join 转化为普通的 mapjoin 时,是否将多个 mapjoin 转化为一个 mapjoin
hive.auto.convert.join.noconditionaltask;
# 将多个 mapjoin 转化为一个 mapjoin 时,其表的最大值,默认为 10M
hive.auto.convert.join.noconditionaltask.size;
  • Hive Join:尽可能使用相同的连接键(会转化为一个 MapReduce 作业)
  • 大表 join 大表:
    • 空 key 过滤:有时 join 超时是因为某些 key 对应的数据太多,而相同 key 对应的数据都会发送到相同的 reduce 上,从而导致内存不够。此时我们应该仔细分析这些异常的 key,很多情况下,这些 key 对应的数据是异常数据,我们需要在 SQL 语句中进行过滤。
    • 空 key 转换:有时虽然某个 key 为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在 join 的结果中,此时我们可以将表 a 中 key 为空的字段赋一个随机的值,使得数据随机均匀地分布到不同的 reduce 上。

Map-Side 聚合

开启在 Map 端的聚合:

# 将顶层的聚合操作放在 Map 阶段执行,从而减轻清洗阶段数据传输和 Reduce 阶段的执行时间,提升总体性能。该设置会消耗更多的内存。
set hive.map.aggr=true;
# map端group by执行聚合时处理的多少行数据(默认:100000)
hive.groupby.mapaggr.checkinterval;
# 进行聚合的最小比例(预先对100000条数据做聚合,若聚合之后的数据量/100000的值大于该配置0.5,则不会聚合)
hive.map.aggr.hash.min.reduction;
# map 端聚合使用的内存的最大值
hive.map.aggr.hash.percentmemory;
# map 端做聚合操作是 hash 表的最大可用内容,大于该值则会触发 flush
hive.map.aggr.hash.force.flush.memory.threshold;
# 是否对 Group By 产生的数据倾斜做优化,默认为 false
hive.groupby.skewindata;

合并小文件

  • 文件数目小,容易在文件存储端造成压力,给 hdfs 造成压力,影响效率。
  • 设置合并属性:
    • 是否合并 map 输出文件:hive.merge.mapfiles=true;
    • 是否合并 reduce 输出文件:hive.merge.mapredfiles=true;
    • 合并文件的大小:hive.merge.size.per.task=25610001000;
  • 去重统计:数据量小的时候无所谓,数据量大的情况下,由于 count distinct 操作需要用一个 Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job 很难完成,一般 count distinct 使用先 group by 再 count 的方式替换。

控制 Hive 中 Map 以及 Reduce 的数量

  • Map 数量相关的参数:
    • mapred.max.split.size:一个split的最大值,即每个map处理文件的最大值,默认为 256M
    • mapred.min.split.size.per.node:一个节点上split的最小值,默认为 1
    • mapred.min.split.size.per.rack:一个机架上split的最小值,默认为 1
  • Reduce 数量相关的参数:
    • mapred.reduce.tasks:强制指定 reduce 任务的数量。默认为 1
    • hive.exec.reducers.bytes.per.reducer:每个 reduce 任务处理的数据量。默认为 256M
    • hive.exec.reducers.max:每个任务最大的 reduce 数。默认为 1009

Hive JVM 重用

  • 适用场景:
    • 小文件个数过多
    • task 个数过多
  • 通过 set mapred.job.reuse.jvm.num.tasks=n; 来设置
    • n 为 task 插槽个数
  • 缺点:设置开启后,task 插槽会一直占用资源,不论是否有 task 运行,直到所有的 task 即整个 job 全部执行完成时,才会释放所有的 task 插槽资源。

Hive 高可用

环境如下:

-caroly01caroly02caroly03caroly04
Zookeeper
Hiveserver2
beeline

在『caroly02』的hist-site.xml 中增加如下节点:

<property>
    <name>hive.server2.support.dynamic.service.discovery</name>
    <value>true</value>
</property>
<property>
    <name>hive.server2.zookeeper.namespace</name>
    <value>hiveserver2_zk</value>
</property>
<property>
    <name>hive.zookeeper.quorum</name>
    <value>caroly02:2181,caroly03:2181,caroly04:2181</value>
</property>
<property>
    <name>hive.zookeeper.client.port</name>
    <value>2181</value>
</property>
<property>
    <name>hive.server2.thrift.bind.host</name>
    <value>caroly02</value>
</property>
<property>
    <name>hive.server2.thrift.port</name>
    <value>10001</value> 
</property>


在『caroly04』的hist-site.xml 中增加如下节点:

<property>
    <name>hive.server2.support.dynamic.service.discovery</name>
    <value>true</value>
</property>
<property>
    <name>hive.server2.zookeeper.namespace</name>
    <value>hiveserver2_zk</value>
</property>
<property>
    <name>hive.zookeeper.quorum</name>
    <value>caroly02:2181,caroly03:2181,caroly04:2181</value>
</property>
<property>
    <name>hive.zookeeper.client.port</name>
    <value>2181</value>
</property>
<property>
    <name>hive.server2.thrift.bind.host</name>
    <value>caroly04</value>
</property>
<property>
    <name>hive.server2.thrift.port</name>
    <value>10001</value> 
</property>

访问

beeline

!connect jdbc:hive2://caroly02,caroly03,caroly04/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk root 123


jdbc
public class HiveJdbcClient2 {

	private static String driverName = "org.apache.hive.jdbc.HiveDriver";

	public static void main(String[] args) throws SQLException {
		try {
			Class.forName(driverName);
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}

		Connection conn = DriverManager.getConnection("jdbc:hive2://caroly02,caroly03,caroly04/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk", "root", "");
		Statement stmt = conn.createStatement();
		String sql = "select * from psn";
		ResultSet res = stmt.executeQuery(sql);
		while (res.next()) {
			System.out.println(res.getString(1));
		}
	}
}

压缩

开启『map』输出阶段压缩可以减少『job』中『map』和『Reduce task』间数据传输量。

# 开启hive中间传输数据压缩功能
set hive.exec.compress.intermediate=true;
# 开启mapreduce中map输出压缩功能
set mapreduce.map.output.compress=true;
# 设置mapreduce中map输出数据的压缩方式
set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec;

当『Hive』将输出写入到表中时,输出内容同样可以进行压缩。属性『hive.exec.compress.output』控制着这个功能。用户可能需要保持默认设置文件中的默认值false,这样默认的输出就是非压缩的纯文本文件了。用户可以通过在查询语句或执行脚本中设置这个值为true,来开启输出结果压缩功能。

# 开启hive最终输出数据压缩功能
set hive.exec.compress.output=true;
# 开启mapreduce最终输出数据压缩
set mapreduce.output.fileoutputformat.compress=true;
# 设置mapreduce最终数据输出压缩方式
set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
# 设置mapreduce最终数据输出压缩为块压缩
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
# 测试一下输出结果是否是压缩文件
insert overwrite local directory '/root/data' select * from aaaa;

文件存储

  • 行存储:查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。例如:TEXTFILE 和SEQUENCEFILE。
    • TEXTFILE:默认格式,数据不做压缩,磁盘开销大,数据解析开销大。可结合 Gzip、Bzip2 使用(系统自动检查,执行查询时自动解压),但使用这种方式,hive 不会对数据进行切分,从而无法对数据进行并行操作。
  • 列存储:因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。例如:ORC 和 PARQUET。
    • ORC:Orc (Optimized Row Columnar)是 hive 0.11 版里引入的新的存储格式。可以看到每个 Orc 文件由 1 个或多个 stripe 组成,每个 stripe 250MB 大小,这个 Stripe 实际相当于 RowGroup 概念,不过大小由 4MB->250MB,这样应该能提升顺序读的吞吐率。每个 Stripe 里有三部分组成,分别是 Index Data, Row Data, Stripe Footer。
      • Index Data:一个轻量级的 index,默认是每隔 1W 行做一个索引。这里做的索引应该只是记录某行的各字段在 Row Data 中的 offset。
      • Row Data:存的是具体的数据,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个 Stream 来存储。
      • Stripe Footer:存的是各个 Stream 的类型,长度等信息。每个文件有一个 File Footer,这里面存的是每个 Stripe 的行数,每个 Column 的数据类型信息等;每个文件的尾部是一个 PostScript,这里面记录了整个文件的压缩类型以及 FileFooter 的长度信息等。在读取文件时,会 seek 到文件尾部读 PostScript,从里面解析到 File Footer 长度,再读 FileFooter,从里面解析到各个 Stripe 信息,再读各个 Stripe,即从后往前读。
    • PARQUET:Parquet 是面向分析型业务的列式存储格式。Parquet 文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此 Parquet 格式文件是自解析的。通常情况下,在存储 Parquet 数据的时候会按照 Block 大小设置行组的大小,由于一般情况下每一个 Mapper 任务处理数据的最小单位是一个 Block,这样可以把每一个行组由一个 Mapper 任务处理,增大任务执行并行度。