原创文章,同步首发自作者个人博客转载请务必在文章开头处注明出处。
INSERT OVERWRITE TABLE test
SELECT CASE WHEN id < 940000000 THEN (9500000 + (CAST (RAND() * 8 AS INTEGER)) * 12 )
ELSE CAST(id/100 AS INTEGER)
END,
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;
groupByKey(12)对id分组处理,且Shuffle并行度为12。代码如下
public class SparkDataSkew {
public static void main(String args) {
SparkSession sparkSession = SparkSession.builder()
.appName("SparkDataSkewTunning")
.config("hive.metastore.uris", "thrift://hadoop1:9083")
.enableHiveSupport()
.getOrCreate();
Dataset<Row> dataframe = sparkSession.sql( "select * from test");
dataframe.toJavaRDD()
.mapToPair((Row row) -> new Tuple2<Integer, String>(row.getInt(0),row.getString(1)))
.groupByKey(12)
.mapToPair((Tuple2<Integer, Iterable<String>> tuple) -> {
int id = tuple._1();
AtomicInteger atomicInteger = new AtomicInteger(0);
tuple._2().forEach((String name) -> atomicInteger.incrementAndGet());
return new Tuple2<Integer, Integer>(id, atomicInteger.get());
}).count();
sparkSession.stop();
sparkSession.close();
}
}
spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jar
groupByKey(48)将Shuffle并行度调整为48,重新提交到Spark。新的Job的GroupBy Stage所有Task状态如下图所示。
groupByKey(11)将Shuffle并行度调整为11,重新提交到Spark。新Job的GroupBy Stage的所有Task状态如下图所示。
spark.default.parallelism设置。如果是Spark SQL,还可通过
SET spark.sql.shuffle.partitions=[num_tasks]
设置并行度。可用最小的代价解决问题。一般如果出现数据倾斜,都可以通过这种方法先试验几次,如果问题未解决,再尝试其它方法。groupByKey算子上,使用自定义的
Partitioner(实现如下)
.groupByKey(new Partitioner() {
@Override
public int numPartitions() {
return 12;
}
@Override
public int getPartition(Object key) {
int id = Integer.parseInt(key.toString());
if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
return (id - 9500000) / 12;
} else {
return id % 12;
}
}
})
INSERT OVERWRITE TABLE test
SELECT CAST(CASE WHEN id < 980000000 THEN (95000000 + (CAST (RAND() * 4 AS INT) + 1) * 48 )
ELSE CAST(id/10 AS INT) END AS STRING),
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;
INSERT OVERWRITE TABLE test_new
SELECT CAST(CAST(id/10 AS INT) AS STRING),
name
FROM student_delta_external
WHERE id BETWEEN 950000000 AND 950500000;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;
CACHE TABLE test_new将小表test_new进行cache。现通过如下SQL进行Join。
CACHE TABLE test_new;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;
SET spark.sql.autoBroadcastJoinThreshold=104857600;将Broadcast的阈值设置得足够大。
SET spark.sql.autoBroadcastJoinThreshold=104857600;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;
INSERT OVERWRITE TABLE test
SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS INT) + 1) * 48 )
ELSE CAST(id/100 AS INT) END AS STRING),
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;
INSERT OVERWRITE TABLE test_new
SELECT CAST(CAST(id/100 AS INT) AS STRING),
name
FROM student_delta_external
WHERE id BETWEEN 950000000 AND 950500000;
public class SparkDataSkew{
public static void main(String args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("DemoSparkDataFrameWithSkewedBigTableDirect");
sparkConf.set("spark.default.parallelism", parallelism + "");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
.mapToPair((String row) -> {
String str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});
JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
.mapToPair((String row) -> {
String str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});
leftRDD.join(rightRDD, parallelism)
.mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2()))
.foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
AtomicInteger atomicInteger = new AtomicInteger();
iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
});
javaSparkContext.stop();
javaSparkContext.close();
}
}
public class SparkDataSkew{
public static void main(String args) {
int parallelism = 48;
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("SolveDataSkewWithRandomPrefix");
sparkConf.set("spark.default.parallelism", parallelism + "");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
.mapToPair((String row) -> {
String str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});
JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
.mapToPair((String row) -> {
String str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});
String skewedKeyArray = new String{"9500048", "9500096"};
Set<String> skewedKeySet = new HashSet<String>();
List<String> addList = new ArrayList<String>();
for(int i = 1; i <=24; i++) {
addList.add(i + "");
}
for(String key : skewedKeyArray) {
skewedKeySet.add(key);
}
Broadcast<Set<String>> skewedKeys = javaSparkContext.broadcast(skewedKeySet);
Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);
JavaPairRDD<String, String> leftSkewRDD = leftRDD
.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
.mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>((new Random().nextInt(24) + 1) + "," + tuple._1(), tuple._2()));
JavaPairRDD<String, String> rightSkewRDD = rightRDD.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
.flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
.map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
.collect(Collectors.toList())
.iterator()
);
JavaPairRDD<String, String> skewedJoinRDD = leftSkewRDD
.join(rightSkewRDD, parallelism)
.mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));
JavaPairRDD<String, String> leftUnSkewRDD = leftRDD.filter((Tuple2<String, String> tuple) -> !skewedKeys.value().contains(tuple._1()));
JavaPairRDD<String, String> unskewedJoinRDD = leftUnSkewRDD.join(rightRDD, parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2()));
skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
AtomicInteger atomicInteger = new AtomicInteger();
iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
});
javaSparkContext.stop();
javaSparkContext.close();
}
}
public class SparkDataSkew {
public static void main(String args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("ResolveDataSkewWithNAndRandom");
sparkConf.set("spark.default.parallelism", parallelism + "");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
.mapToPair((String row) -> {
String str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});
JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
.mapToPair((String row) -> {
String str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});
List<String> addList = new ArrayList<String>();
for(int i = 1; i <=48; i++) {
addList.add(i + "");
}
Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);
JavaPairRDD<String, String> leftRandomRDD = leftRDD.mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>(new Random().nextInt(48) + "," + tuple._1(), tuple._2()));
JavaPairRDD<String, String> rightNewRDD = rightRDD
.flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
.map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
.collect(Collectors.toList())
.iterator()
);
JavaPairRDD<String, String> joinRDD = leftRandomRDD
.join(rightNewRDD, parallelism)
.mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));
joinRDD.foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
AtomicInteger atomicInteger = new AtomicInteger();
iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
});
javaSparkContext.stop();
javaSparkContext.close();
}
}
本文为 @ 21CTO 创作并授权 21CTO 发布,未经许可,请勿转载。
内容授权事宜请您联系 webmaster@21cto.com或关注 21CTO 公众号。
该文观点仅代表作者本人,21CTO 平台仅提供信息存储空间服务。