GroupByKey, ReduceByKey 이전의 스파크를 다루는 기술 이라는 책의 4번째 챕터에 나왔었는데 이런 기능이 있구나 하고 넘어갔었다.
groupByKey 변화 연산자는 동일한 키를 가진 모든 요소를 단일 키-값 쌍으로 모은 Pair RDD를 반환한다.
우선 결론은 GroupByKey는 각 키의 모든 값을 메모리로 가져오기 때문에 이 메서드를 사용할 때는 메모리 리소스를 과다하게 사용하지 않도록 주의해야한다.
한꺼번에 그루핑 할 필요가 없으면 aggregateByKey나 reduceByKey, foldByKey를 사용하는 것이 좋다. 고 설명 하고 있다.
PairRDDFunctions.scala 를 살펴 보면 다음과 같다.
groupByKey ()
**
- Group the values for each key in the RDD into a single sequence. Allows controlling the
- partitioning of the resulting key-value pair RDD by passing a Partitioner.
- The ordering of elements within each group is not guaranteed, and may even differ
- each time the resulting RDD is evaluated.
- @note This operation may be very expensive. If you are grouping in order to perform an
- aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey
- or PairRDDFunctions.reduceByKeywill provide much better performance.
- @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
- key in memory. If a key has too many values, it can result in an OutOfMemoryError.
- /
 def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
 // groupByKey shouldn't use map side combine because map side combine does not
 // reduce the amount of data shuffled and requires all map side data be inserted
 // into a hash table, leading to more objects in the old gen.
 val createCombiner = (v: V) => CompactBuffer(v)
 val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
 val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
 val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
 createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
 bufs.asInstanceOf[RDD[(K, Iterable[V])]]
 }
 
reduceByKey()
MapReduce의 "combiner"와 마찬가지로 결과를 리듀서로 보내기 전에 각 맵퍼에서 로컬로 병합을 수행합니다.
기존 파티 셔너 병렬 처리 수준으로 출력이 해시 파티션됩니다.
  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
   * parallelism level.
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }
아래 reduceByKey는 위의 메소드 안의 reduceByKey(defaultPartitioner(self),func)설명
  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }
테스트 예제 코드
전체 코드는 다음과 같다.
import org.apache.spark.sql.SparkSession
object Main{
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkUtil.getInstance();
    //예제 데이터 만들기
    val studentRDD = sparkSession.sparkContext.parallelize(Array(
      ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91),
      ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62),
      ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78),
      ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
      ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91),
      ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65),
      ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86),
      ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
      ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64),
      ("Juan", "Biology", 60)), 3);
    studentRDD.foreach(println);
    //RDD에서의 groupByData
    val groupBydata = studentRDD
      .groupBy((name)=> name._1);
    groupBydata.foreach(println)
    /* groupBydata 결과
    (Tina,CompactBuffer((Tina,Maths,78), (Tina,Physics,73), (Tina,Chemistry,68), (Tina,Biology,87)))
(Thomas,CompactBuffer((Thomas,Maths,87), (Thomas,Physics,93), (Thomas,Chemistry,91), (Thomas,Biology,74)))
(Jackeline,CompactBuffer((Jackeline,Maths,86), (Jackeline,Physics,62), (Jackeline,Chemistry,75), (Jackeline,Biology,83)))
(Joseph,CompactBuffer((Joseph,Maths,83), (Joseph,Physics,74), (Joseph,Chemistry,91), (Joseph,Biology,82)))
(Juan,CompactBuffer((Juan,Maths,63), (Juan,Physics,69), (Juan,Chemistry,64), (Juan,Biology,60)))
(Jimmy,CompactBuffer((Jimmy,Maths,69), (Jimmy,Physics,62), (Jimmy,Chemistry,97), (Jimmy,Biology,80)))
(Cory,CompactBuffer((Cory,Maths,56), (Cory,Physics,65), (Cory,Chemistry,71), (Cory,Biology,68)))
    */
    // Pair RDD!!
    // pairRDD 로 변경하기 
    val pairStudentSubjectScore = studentRDD.map((student)=>(student._1,student._3));
    //reduceByKey와 groupByKey 비교 
    //reduceByData
    val reduceByData = pairStudentSubjectScore.reduceByKey((value1,value2)=>value1+value2);
    reduceByData.foreach(println);
    /*    reduceByData 결과
    (Tina,306)
    (Thomas,345)
    (Jackeline,306)
    (Joseph,330)
    (Juan,256)
    (Jimmy,308)
    (Cory,260)
    */
    //groupByData
    val groupByData = pairStudentSubjectScore.groupByKey();
    groupByData.foreach(println)
    /* groupByData 결과
        (Tina,CompactBuffer(78, 73, 68, 87))
      (Juan,CompactBuffer(63, 69, 64, 60))
      (Jimmy,CompactBuffer(69, 62, 97, 80))
      (Cory,CompactBuffer(56, 65, 71, 68))
      (Thomas,CompactBuffer(87, 93, 91, 74))
      (Jackeline,CompactBuffer(86, 62, 75, 83))
      (Joseph,CompactBuffer(83, 74, 91, 82))
    */
    val groupByDataSum = groupByData.map(value=>( value._1,value._2.sum))
    groupByDataSum.foreach(println)
        /* groupByDataSum 결과
        (Juan,256)
      (Thomas,345)
      (Jackeline,306)
      (Joseph,330)
      (Jimmy,308)
      (Cory,260)
      (Tina,306)
        */
    sparkSession.close();
  }
}
요약
ReduceByKey는 셔플 전에 데이터를 결합하므로 GroupByKey에 비해 네트워크를 통해 전송되는 데이터의 양을 최소화합니다. GroupByKey는 대부분 디스크 부족 문제의 원인입니다 .
참고
- https://thebook.io/006908/part01/ch04/03/03/01/ 
- https://intellipaat.com/community/7677/spark-difference-between-reducebykey-vs-groupbykey-vs-aggregatebykey-vs-combinebykey 
- https://www.streamhub.co.uk/apache-spark-tuning-manual/?fbclid=IwAR3xe6qYN0DN2FIZo0sdoPr1UjLYIL9QofCtgzcDcEAHGTZc5KOiTTy-ASs