Spark Core 调优指南

sparksql在广播小表的时候,有时候单纯设置spark.sql.autoBroadcastJoinThreshold并不能将小表广播出去,还需要搭配cachetable才行。但是有时候单纯设置spark.sql.autoBroadcastJoinThreshold又可以。请问:为什么会出现这种情况.

1 体系

体系

2 配置

  • 资源分配

    • num-executors:executor的个数
    • executor-cores:cpu core 的两倍
    • executor-memory:每个executor的内存大小
    • driver-memory:driver的内存大小
  • 并行度

    • spark.default.parallelism
    • spark.sql.partitions
    • repartition(num)
  • 内存使用

    • spark.storage.memoryFraction:用于cache的内存比例
    • spark.shuffle.memoryFraction:shffule阶段的缓存占内存比例

3 代码

  • 不要重复创建RDD
  • 重复使用的RDD进行cache
  • 使用高性能算子
    • mapPartition代替map
    • foreachPartition代替foreach
    • 用reduceByKey代替groupByKey
  • filter以后使用coalesce减少小任务
  • 广播大变量:sc.broadcast

4 数据

  • 序列化
    • 使用KryoSerializer代替Java序列化
  • 文件格式
    • 使用parquet文件格式,列式存储,读取效率高

5 倾斜

  • 澳门贵宾会娱乐官网,聚合(xxByKey)

    • 造成倾斜的Key数量小且不重要

      • 抽样+过滤
    • 造成倾斜的Key数量多且重要

      • 增加并行度
      • 局部聚合+全局聚合给每个Key加上前缀,聚合
      • 对上步聚合结果的Key去前缀,聚合
  • 连接

    • 小表连接大表

      • 将reduce join 转成map join
      • 使用广播变量将小表数据进行广播
      • SparkSQL设置spark.sql.autoBroadcastJoinThreshold,默认10m
    • 大表连接大表

      • 造成倾斜的Key不多

        • 对RDD1进行sample找出造成倾斜的Key
        • 分别对RDD1和RDD2进行filter将其分成skewRDD1和commonRDD1以及skewRDD1和commonRDD2
        • 然后对skewRDD1的key添加随机前缀n,对skewRDD2进行n倍扩容,然后join,再对结果的key进行前缀移除得到joinRDD1
        • 将commonRDD1和commonRDD2进行连接,得到joinRDD2
          joinRDD1.union(joinRDD2)
      • 造成倾斜的Key多

        • 对RDD1进行随机前缀n的添加
        • 对RDD2进行n倍扩容
        • 然后进行连接
        • 进行随机前缀的移除处理得到结果

发表评论

电子邮件地址不会被公开。 必填项已用*标注