目 录CONTENT

文章目录

数工总结 | 组件调优

Wissy
2024-09-13 / 0 评论 / 0 点赞 / 102 阅读 / 0 字

一、Hadoop

1.1 Map 端

  • 增大环形缓冲区大小
  • 增大环形缓冲区的溢写比例
  • 使用 Combiner 预聚合

1.2 Reduce 端

  • 合理设置 Map 和 Reduce 的个数

    太多:会导致任务间竞争资源,造成任务等待超时。

    太少:会造成 Task 等待。

  • 设置 Map、Reduce 共存

  • 规避使用 Reduce

  • 增加每个 Reduce 去 Map 中拿数的并行度

  • 增大 Reduce 的资源

  • 使用压缩,减少 IO(LZO、Snappy)

1.3 数据倾斜

  • Map 端进行预聚合
  • 二次 MR(局部聚合 + 全局聚合)
  • 增大 Reduce 的并行度
  • 自定义分区器

二、Hive

2.1 Map 端

  • 合并小文件
  • 合理设置 Map 数据
  • 开启 JVM 重用
  • 调节 Map Task 的内存和 CU 资源大小

2.2 Reduce

  • 合理设置 Reduce 数(生产文件数据大小)
  • 调节 Reduce Task 的内存和 CU 资源大小

2.3 Join

  • 大小表(Map Join)
  • 大表大表(Sort Merge Bucket (SMB) Join)

2.4 数据倾斜优化

  • count(distinct xxx) 改为 group by+count

  • 开启 skew 参数(partition 打散-预聚合 + 全局聚合)

  • Reduce 倾斜

    • 过滤多余的数据 -- 不应影响业务数据
    • key 打散,二次 MR
  • Join

    • 大小表:Map Join

    • 大表大表

      • 单 Key:拆分 + union all
      • 多 Key:打散 + 膨胀 + 局部 + 全局聚合

2.5 CBO 优化

  • 谓词下推

三、Spark

3.1 资源优化

3.1.1 内存模型

可用内存

  • 统一内存(60%)-- 动态

    • 存储内存(Storage):广播变量、cache 缓存
    • 执行内存(Execution):shuffle 产出的中间数据(可强制回收)
  • 其他内存(40%):Spark 元数据或者自定义数据结构

预留内存:300M

3.1.2 资源优化

内存优化

每个 Executor 内存

  • Storage 内存=广播变量 +(cache/Executor 数量)
  • Executor 内存=Executor 核数(并发)*(总文件大小/并行度)
  • Other 内存=自定义数据结构 * 每个 Executor 核数

案例:executor=4*100G/200=2G。100G 数据,并行度 200 个,每个 executor 4 个 core。经验值:128M -> 1G 内存

CPU 资源优化

1 core 处理 1-3 task

1 task 处理 1-5G 数据

参照:Spark 官方文档,建议设置并行度为总 core 的 2-3 倍为最佳

案例:已有资源 100G 数据,excutors:3,excutors-cores:4。并行度的设置=34(1~2)=12-24 个为合理。

3.2 并行度优化

  • 调整算子并行度(repartition、coalecse、reduceByKey 和 groupByKey 等)
  • 自定义分区器

3.3 代码优化

  • 尽量复用 RDD,避免创建重复的 RDD
  • 对重复使用的 RDD 使用持久化
  • 避免使用 shuffle 类算子(广播 join)
  • 使用序列化(Kryo)

3.4 数据倾斜优化

  • 单表-key 打散,二次聚合
  • Join-大小表-广播 Join
  • Join-大大表-过滤大 key+union,局部加全局
  • Join-大大表-大表打散-小表扩容,局部加全局

3.5 Map 端

  • Map 端预聚合(框架自己已经处理)

  • 读取小文件优化

    • 每个分区最大字节数:默认 128M。
    • 打开文件开销:默认 4M(设置为接近小文件的大小)。
    • 最大切片大小:min(分区最大字节数默认值),max(文件开销,计算数)
    • 计算数=总文件大小/分区数
    • 总文件大小=小文件数据总大小 + 小文件数量*文件开销(4M)
  • 增加 Map 溢写时输出流的缓冲区(buffer)(默认:32KB)(5M,申请 2*前内存(无)-> 溢写)

3.6 Reduce 端

合理设置 Reduce 的数据
  1. reduce 数量=shuffle 后的分区数=并行度(和 core&并行度有关)
  2. 并行度和并发度,正常设置为:并行度=2-3 倍的并发度
小文件优化
  1. Spark SQL 文件数=shuffle 的并行度=默认 200

  2. Join 后的结果插入新表

    • 使用 coalesce 算子缩小分区数(不影响 shuffle 的并行度)
    • 调整 shuffle 的并行(会影响 shuffle 的并行度)
  3. 动态分区插入数据

    • 没有 shuffle:主动 shuffle,使用 distribute by 分区字段进行文件合并(注意倾斜,有大 Key,打散拆分多个文件)
    • 有 shuffle
其他优化
  1. 增大 reduce 端的拉取次数
  2. 合理利用 bypass -- 避免排序消耗(reduce read task(shuffle 分区数)<200+ 避免 map 端预聚合(map-side aggregation))

3.7 语法优化

  • RBO-谓词下推
  • RBO-行列过滤
  • RBO-常量替换(先计算出常量值,其他不计算)
  • 广播 Join
  • CBO
  • SMB Join(桶相等,Join 列=桶列)

3.8 Spark 3.0 AQE

  • 动态合并 Shuffle 分区
  • 动态切换 Join 策略
  • 动态优化倾斜连接(Skew Joins)

四、Flink

4.1 特征

  • 流批一体化(无界流,有界流)

  • Exactly-Once 状态一致性

    • 两阶段提交 + 状态保存
    • 端到端的精准一致性语义
  • 状态管理

  • 时间处理

  • 支持窗口

  • 高吞吐、高性能、低时延的实时流处理引擎

  • 部署灵活性

4.2 内存优化

实际任务调整,默认:1core,4G。

4.3 并行度优化

Source 端

和 Kafka 分区一致

转换算子
  • KeyBy 之前(一般不处理)
  • KeyBy 之后(调整为 2 的整数次方)
Sink 端
  • 输出到 Kafka(分区数)
  • 数据库-性能评估

4.4 RocksDB 大状态优化

基于内存 + 磁盘,和 Hbase 存储相似。

读写顺序:

  • 读:先 cache,再读磁盘。

  • 写:先内存,再刷写磁盘。

优化点:

  • 开启增量检查点
  • 开启本地恢复

4.5 反压优化

  • 调整资源
  • 调整并行度

4.6 数据倾斜优化

  • KeyBy 之后聚合-开启 MiniBatch+Local
  • KeyBy 之前聚合-随机打撒-二次聚合
  • KeyBy 后的开窗聚合-随机打散-二次窗口聚合

4.7 Flink SQL 优化

  • 设置空闲状态保存时间(TTL(36 小时))
  • 开启 MiniBatch(时间(5s)和数据(20000))
  • 开启 count(distinct) 参数优化
  • 开启窗口 MiniBatch 优化

0

评论区