Spark数据倾斜
数据倾斜产生的两个条件
1.key分布不均
2.产生了shuffle
解决数据倾斜
1. 使用Hive ETL预处理数据
将数据倾斜提前到Hive中,因为Hive底层是MapReduce,MR比较稳定。
没有解决数据倾斜,MR相对稳定点,不会报错,spark出现了数据倾斜,跑很久都跑不出来,就会报错。
2. 过滤少数导致倾斜的key
前提是导致倾斜的key对业务没有影响。
package optimize
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/** * @author 郭帅帅 * @2022-01-09-8:52 * */
object Demo8FilterKey {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("app")
val sc: SparkContext = new SparkContext(conf)
val lines: RDD[String] = sc.textFile("data/word")
println("第一个RDD分区数量:" + lines.getNumPartitions)
val countRDD: RDD[(String, Int)] = lines
.flatMap(_.split(","))
.map((_, 1))
.groupByKey()
.map(x => (x._1, x._2.toList.sum))
println("聚合之后RDD分区的数量" + countRDD.getNumPartitions)
// countRDD.foreach(println)
/** * 采样key ,g过滤掉导致数据倾斜并且对业务影响不大的key * */
val wordRDD: RDD[(String, Int)] = lines
.flatMap(_.split(","))
.map((_, 1))
val top1: Array[(String, Int)] = wordRDD
.sample(true, 0.1)
.reduceByKey(_ + _)
.sortBy(-_._2)
.take(1)
//导致数据倾斜额key
val key: String = top1(0)._1
//过滤导致倾斜的key
wordRDD
.filter(t => !"null".equals(t._1))
.groupByKey()
.map(x => (x._1, x._2.toList.sum))
.foreach(println)
while (true) {
}
}
}
3. 提高shuffle操作的并行度
提高并行度,每一个分区中分到的数据量会减少,可以一定程度缓解数据倾斜。
4. 双重聚合 (解决Key分布不均的问题)
- 给每一个Key增加随机前缀,聚合一次。
- 再去掉随机前缀,再去掉一次。
- 不适合做太复杂的业务,代码实现很复杂
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random
/** * @author 郭帅帅 * @2022-01-09-8:55 * */
class Demo9DoubleReduce {
/** * 双重聚合 * 一般适用于 业务不复杂的情况 * */
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("app")
val sc: SparkContext = new SparkContext(conf)
val lines: RDD[String] = sc.textFile("data/word")
val wordRDD: RDD[String] = lines
.flatMap(_.split(","))
.filter(!_.equals(""))
// 对每一个key打上随机5以内前缀
wordRDD.map(word => {
val pix: Int = Random.nextInt(5)
(pix + "-" + word, 1)
})
.groupByKey() //第一次聚合
.map(t => (t._1, t._2.toList.sum))
.map(t => {
///去掉随机前缀
(t._1.split("-")(1), t._2)
})
.groupByKey() //第二次聚合
.map(t => (t._1, t._2.toList.sum))
.foreach(println)
while (true) {
}
}
}
5. 将reduce join转为map join
MapJoin避免了shuffle
只适合大表关联小表(<1G)
将小表广播出去
6. 采样倾斜key并分拆join操作(双重join)
如果是两个大表关联,有一个表其中部分Key发生了数据倾斜,怎么办?
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo10Doublejoin {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("app").setMaster("local")
val sc = new SparkContext(conf)
val dataList1 = List(
("java", 1),
("shujia", 2),
("shujia", 3),
("shujia", 1),
("shujia", 1))
val dataList2 = List(
("java", 100),
("java", 99),
("shujia", 88),
("shujia", 66))
val RDD1: RDD[(String, Int)] = sc.parallelize(dataList1)
val RDD2: RDD[(String, Int)] = sc.parallelize(dataList2)
//采样倾斜的key
val sampleRDD: RDD[(String, Int)] = RDD1.sample(false, 1.0)
//skewedKey 导致数据倾斜的key
val skewedKey: String = sampleRDD.map(x => (x._1, 1))
.reduceByKey(_ + _)
.map(x => (x._2, x._1))
.sortByKey(ascending = false)
.take(1)(0)._2
//导致数据倾斜key的RDD
val skewedRDD1: RDD[(String, Int)] = RDD1.filter(tuple => {
tuple._1.equals(skewedKey)
})
//没有倾斜的key
val commonRDD1: RDD[(String, Int)] = RDD1.filter(tuple => {
!tuple._1.equals(skewedKey)
})
val skewedRDD2: RDD[(String, Int)] = RDD2.filter(tuple => {
tuple._1.equals(skewedKey)
})
val commonRDD2: RDD[(String, Int)] = RDD2.filter(tuple => {
!tuple._1.equals(skewedKey)
})
val n = 2
//对产生数据倾斜的key 使用mapjoin
val skewedMap: Map[String, Int] = skewedRDD2.collect().toMap
val bro: Broadcast[Map[String, Int]] = sc.broadcast(skewedMap)
val resultRDD1: RDD[(String, (Int, Int))] = skewedRDD1.map(kv => {
val word: String = kv._1
val i: Int = bro.value.getOrElse(word, 0)
(word, (kv._2, i))
})
//没有数据倾斜的RDD 正常join
val resultRDD2: RDD[(String, (Int, Int))] = commonRDD1.join(commonRDD2)
//将两个结果拼接
resultRDD1.union(resultRDD2)
.foreach(println)
}
}
7. 使用随机前缀和扩容RDD进行join(X 会导致数据膨胀)
如果两个都是大表,同时两个表的Key都分布不均。
文章评论