GroupByKey, ReduceByKey 이전의 스파크를 다루는 기술 이라는 책의 4번째 챕터에 나왔었는데 이런 기능이 있구나 하고 넘어갔었다.
groupByKey 변화 연산자는 동일한 키를 가진 모든 요소를 단일 키-값 쌍으로 모은 Pair RDD를 반환한다.
우선 결론은 GroupByKey는 각 키의 모든 값을 메모리로 가져오기 때문에 이 메서드를 사용할 때는 메모리 리소스를 과다하게 사용하지 않도록 주의해야한다.
한꺼번에 그루핑 할 필요가 없으면 aggregateByKey나 reduceByKey, foldByKey를 사용하는 것이 좋다. 고 설명 하고 있다.
PairRDDFunctions.scala 를 살펴 보면 다음과 같다.
groupByKey ()
**
- Group the values for each key in the RDD into a single sequence. Allows controlling the
- partitioning of the resulting key-value pair RDD by passing a Partitioner.
- The ordering of elements within each group is not guaranteed, and may even differ
- each time the resulting RDD is evaluated.
- @note This operation may be very expensive. If you are grouping in order to perform an
- aggregation (such as a sum or average) over each key, using
PairRDDFunctions.aggregateByKey
- or
PairRDDFunctions.reduceByKey
will provide much better performance.
- @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
- key in memory. If a key has too many values, it can result in an
OutOfMemoryError
.
- /
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
reduceByKey()
MapReduce의 "combiner"와 마찬가지로 결과를 리듀서로 보내기 전에 각 맵퍼에서 로컬로 병합을 수행합니다.
기존 파티 셔너 병렬 처리 수준으로 출력이 해시 파티션됩니다.
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
아래 reduceByKey는 위의 메소드 안의 reduceByKey(defaultPartitioner(self),func)설명
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
테스트 예제 코드
전체 코드는 다음과 같다.
import org.apache.spark.sql.SparkSession
object Main{
def main(args: Array[String]): Unit = {
val sparkSession = SparkUtil.getInstance();
val studentRDD = sparkSession.sparkContext.parallelize(Array(
("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91),
("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62),
("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78),
("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91),
("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65),
("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86),
("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64),
("Juan", "Biology", 60)), 3);
studentRDD.foreach(println);
val groupBydata = studentRDD
.groupBy((name)=> name._1);
groupBydata.foreach(println)
val pairStudentSubjectScore = studentRDD.map((student)=>(student._1,student._3));
val reduceByData = pairStudentSubjectScore.reduceByKey((value1,value2)=>value1+value2);
reduceByData.foreach(println);
val groupByData = pairStudentSubjectScore.groupByKey();
groupByData.foreach(println)
val groupByDataSum = groupByData.map(value=>( value._1,value._2.sum))
groupByDataSum.foreach(println)
sparkSession.close();
}
}
요약
ReduceByKey는 셔플 전에 데이터를 결합하므로 GroupByKey에 비해 네트워크를 통해 전송되는 데이터의 양을 최소화합니다. GroupByKey는 대부분 디스크 부족 문제의 원인입니다 .
참고
https://thebook.io/006908/part01/ch04/03/03/01/
https://intellipaat.com/community/7677/spark-difference-between-reducebykey-vs-groupbykey-vs-aggregatebykey-vs-combinebykey
https://www.streamhub.co.uk/apache-spark-tuning-manual/?fbclid=IwAR3xe6qYN0DN2FIZo0sdoPr1UjLYIL9QofCtgzcDcEAHGTZc5KOiTTy-ASs