GroupByKey, ReduceByKey 이전의 스파크를 다루는 기술 이라는 책의 4번째 챕터에 나왔었는데 이런 기능이 있구나 하고 넘어갔었다.

groupByKey 변화 연산자는 동일한 키를 가진 모든 요소를 단일 키-값 쌍으로 모은 Pair RDD를 반환한다.

우선 결론은 GroupByKey는 각 키의 모든 값을 메모리로 가져오기 때문에 이 메서드를 사용할 때는 메모리 리소스를 과다하게 사용하지 않도록 주의해야한다.

한꺼번에 그루핑 할 필요가 없으면 aggregateByKey나 reduceByKey, foldByKey를 사용하는 것이 좋다. 고 설명 하고 있다.

PairRDDFunctions.scala 를 살펴 보면 다음과 같다.

groupByKey ()

  • 매우 비싼 동작

  • 그룹의 순서는 보장하지 않는다. 그리고 key는 메모리에 존재하고, 만약 키가 너무 많으면 OutofMemory가 발생한다.

**

  • Group the values for each key in the RDD into a single sequence. Allows controlling the
  • partitioning of the resulting key-value pair RDD by passing a Partitioner.
  • The ordering of elements within each group is not guaranteed, and may even differ
  • each time the resulting RDD is evaluated.
  • @note This operation may be very expensive. If you are grouping in order to perform an
  • aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey
  • or PairRDDFunctions.reduceByKey will provide much better performance.
  • @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
  • key in memory. If a key has too many values, it can result in an OutOfMemoryError.
  • /
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
    }

reduceByKey()

MapReduce의 "combiner"와 마찬가지로 결과를 리듀서로 보내기 전에 각 맵퍼에서 로컬로 병합을 수행합니다.
기존 파티 셔너 병렬 처리 수준으로 출력이 해시 파티션됩니다.

  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
   * parallelism level.
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

아래 reduceByKey는 위의 메소드 안의 reduceByKey(defaultPartitioner(self),func)설명

  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

테스트 예제 코드

전체 코드는 다음과 같다.

import org.apache.spark.sql.SparkSession

object Main{

  def main(args: Array[String]): Unit = {

    val sparkSession = SparkUtil.getInstance();
    //예제 데이터 만들기
    val studentRDD = sparkSession.sparkContext.parallelize(Array(
      ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91),
      ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62),
      ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78),
      ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
      ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91),
      ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65),
      ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86),
      ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
      ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64),
      ("Juan", "Biology", 60)), 3);

    studentRDD.foreach(println);
    //RDD에서의 groupByData
    val groupBydata = studentRDD
      .groupBy((name)=> name._1);

    groupBydata.foreach(println)
    /* groupBydata 결과
    (Tina,CompactBuffer((Tina,Maths,78), (Tina,Physics,73), (Tina,Chemistry,68), (Tina,Biology,87)))
(Thomas,CompactBuffer((Thomas,Maths,87), (Thomas,Physics,93), (Thomas,Chemistry,91), (Thomas,Biology,74)))
(Jackeline,CompactBuffer((Jackeline,Maths,86), (Jackeline,Physics,62), (Jackeline,Chemistry,75), (Jackeline,Biology,83)))
(Joseph,CompactBuffer((Joseph,Maths,83), (Joseph,Physics,74), (Joseph,Chemistry,91), (Joseph,Biology,82)))
(Juan,CompactBuffer((Juan,Maths,63), (Juan,Physics,69), (Juan,Chemistry,64), (Juan,Biology,60)))
(Jimmy,CompactBuffer((Jimmy,Maths,69), (Jimmy,Physics,62), (Jimmy,Chemistry,97), (Jimmy,Biology,80)))
(Cory,CompactBuffer((Cory,Maths,56), (Cory,Physics,65), (Cory,Chemistry,71), (Cory,Biology,68)))

    */


    // Pair RDD!!
    // pairRDD 로 변경하기 
    val pairStudentSubjectScore = studentRDD.map((student)=>(student._1,student._3));

    //reduceByKey와 groupByKey 비교 

    //reduceByData
    val reduceByData = pairStudentSubjectScore.reduceByKey((value1,value2)=>value1+value2);

    reduceByData.foreach(println);
    /*    reduceByData 결과
    (Tina,306)
    (Thomas,345)
    (Jackeline,306)
    (Joseph,330)
    (Juan,256)
    (Jimmy,308)
    (Cory,260)

    */


    //groupByData
    val groupByData = pairStudentSubjectScore.groupByKey();
    groupByData.foreach(println)
    /* groupByData 결과
        (Tina,CompactBuffer(78, 73, 68, 87))
      (Juan,CompactBuffer(63, 69, 64, 60))
      (Jimmy,CompactBuffer(69, 62, 97, 80))
      (Cory,CompactBuffer(56, 65, 71, 68))
      (Thomas,CompactBuffer(87, 93, 91, 74))
      (Jackeline,CompactBuffer(86, 62, 75, 83))
      (Joseph,CompactBuffer(83, 74, 91, 82))
    */

    val groupByDataSum = groupByData.map(value=>( value._1,value._2.sum))

    groupByDataSum.foreach(println)
        /* groupByDataSum 결과
        (Juan,256)
      (Thomas,345)
      (Jackeline,306)
      (Joseph,330)
      (Jimmy,308)
      (Cory,260)
      (Tina,306)
        */

    sparkSession.close();

  }

}

요약
ReduceByKey는 셔플 전에 데이터를 결합하므로 GroupByKey에 비해 네트워크를 통해 전송되는 데이터의 양을 최소화합니다. GroupByKey는 대부분 디스크 부족 문제의 원인입니다 .

참고

  1. https://thebook.io/006908/part01/ch04/03/03/01/

  2. https://intellipaat.com/community/7677/spark-difference-between-reducebykey-vs-groupbykey-vs-aggregatebykey-vs-combinebykey

  3. https://www.streamhub.co.uk/apache-spark-tuning-manual/?fbclid=IwAR3xe6qYN0DN2FIZo0sdoPr1UjLYIL9QofCtgzcDcEAHGTZc5KOiTTy-ASs

+ Recent posts