SparkSession 에서 read를 하게 되면 DataFrame, Dataset으로 생성할 수 있고, RDD로 데이터를 생성하려면 SparkSession.sparkContext를 사용해서 생성해야한다.



GroupByKey, ReduceByKey 이전의 스파크를 다루는 기술 이라는 책의 4번째 챕터에 나왔었는데 이런 기능이 있구나 하고 넘어갔었다.

groupByKey 변화 연산자는 동일한 키를 가진 모든 요소를 단일 키-값 쌍으로 모은 Pair RDD를 반환한다.

우선 결론은 GroupByKey는 각 키의 모든 값을 메모리로 가져오기 때문에 이 메서드를 사용할 때는 메모리 리소스를 과다하게 사용하지 않도록 주의해야한다.

한꺼번에 그루핑 할 필요가 없으면 aggregateByKey나 reduceByKey, foldByKey를 사용하는 것이 좋다. 고 설명 하고 있다.

PairRDDFunctions.scala 를 살펴 보면 다음과 같다.

groupByKey ()

  • 매우 비싼 동작

  • 그룹의 순서는 보장하지 않는다. 그리고 key는 메모리에 존재하고, 만약 키가 너무 많으면 OutofMemory가 발생한다.

**

  • Group the values for each key in the RDD into a single sequence. Allows controlling the
  • partitioning of the resulting key-value pair RDD by passing a Partitioner.
  • The ordering of elements within each group is not guaranteed, and may even differ
  • each time the resulting RDD is evaluated.
  • @note This operation may be very expensive. If you are grouping in order to perform an
  • aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey
  • or PairRDDFunctions.reduceByKey will provide much better performance.
  • @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
  • key in memory. If a key has too many values, it can result in an OutOfMemoryError.
  • /
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
    }

reduceByKey()

MapReduce의 "combiner"와 마찬가지로 결과를 리듀서로 보내기 전에 각 맵퍼에서 로컬로 병합을 수행합니다.
기존 파티 셔너 병렬 처리 수준으로 출력이 해시 파티션됩니다.

  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
   * parallelism level.
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

아래 reduceByKey는 위의 메소드 안의 reduceByKey(defaultPartitioner(self),func)설명

  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

테스트 예제 코드

전체 코드는 다음과 같다.

import org.apache.spark.sql.SparkSession

object Main{

  def main(args: Array[String]): Unit = {

    val sparkSession = SparkUtil.getInstance();
    //예제 데이터 만들기
    val studentRDD = sparkSession.sparkContext.parallelize(Array(
      ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91),
      ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62),
      ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78),
      ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
      ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91),
      ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65),
      ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86),
      ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
      ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64),
      ("Juan", "Biology", 60)), 3);

    studentRDD.foreach(println);
    //RDD에서의 groupByData
    val groupBydata = studentRDD
      .groupBy((name)=> name._1);

    groupBydata.foreach(println)
    /* groupBydata 결과
    (Tina,CompactBuffer((Tina,Maths,78), (Tina,Physics,73), (Tina,Chemistry,68), (Tina,Biology,87)))
(Thomas,CompactBuffer((Thomas,Maths,87), (Thomas,Physics,93), (Thomas,Chemistry,91), (Thomas,Biology,74)))
(Jackeline,CompactBuffer((Jackeline,Maths,86), (Jackeline,Physics,62), (Jackeline,Chemistry,75), (Jackeline,Biology,83)))
(Joseph,CompactBuffer((Joseph,Maths,83), (Joseph,Physics,74), (Joseph,Chemistry,91), (Joseph,Biology,82)))
(Juan,CompactBuffer((Juan,Maths,63), (Juan,Physics,69), (Juan,Chemistry,64), (Juan,Biology,60)))
(Jimmy,CompactBuffer((Jimmy,Maths,69), (Jimmy,Physics,62), (Jimmy,Chemistry,97), (Jimmy,Biology,80)))
(Cory,CompactBuffer((Cory,Maths,56), (Cory,Physics,65), (Cory,Chemistry,71), (Cory,Biology,68)))

    */


    // Pair RDD!!
    // pairRDD 로 변경하기 
    val pairStudentSubjectScore = studentRDD.map((student)=>(student._1,student._3));

    //reduceByKey와 groupByKey 비교 

    //reduceByData
    val reduceByData = pairStudentSubjectScore.reduceByKey((value1,value2)=>value1+value2);

    reduceByData.foreach(println);
    /*    reduceByData 결과
    (Tina,306)
    (Thomas,345)
    (Jackeline,306)
    (Joseph,330)
    (Juan,256)
    (Jimmy,308)
    (Cory,260)

    */


    //groupByData
    val groupByData = pairStudentSubjectScore.groupByKey();
    groupByData.foreach(println)
    /* groupByData 결과
        (Tina,CompactBuffer(78, 73, 68, 87))
      (Juan,CompactBuffer(63, 69, 64, 60))
      (Jimmy,CompactBuffer(69, 62, 97, 80))
      (Cory,CompactBuffer(56, 65, 71, 68))
      (Thomas,CompactBuffer(87, 93, 91, 74))
      (Jackeline,CompactBuffer(86, 62, 75, 83))
      (Joseph,CompactBuffer(83, 74, 91, 82))
    */

    val groupByDataSum = groupByData.map(value=>( value._1,value._2.sum))

    groupByDataSum.foreach(println)
        /* groupByDataSum 결과
        (Juan,256)
      (Thomas,345)
      (Jackeline,306)
      (Joseph,330)
      (Jimmy,308)
      (Cory,260)
      (Tina,306)
        */

    sparkSession.close();

  }

}

요약
ReduceByKey는 셔플 전에 데이터를 결합하므로 GroupByKey에 비해 네트워크를 통해 전송되는 데이터의 양을 최소화합니다. GroupByKey는 대부분 디스크 부족 문제의 원인입니다 .

참고

  1. https://thebook.io/006908/part01/ch04/03/03/01/

  2. https://intellipaat.com/community/7677/spark-difference-between-reducebykey-vs-groupbykey-vs-aggregatebykey-vs-combinebykey

  3. https://www.streamhub.co.uk/apache-spark-tuning-manual/?fbclid=IwAR3xe6qYN0DN2FIZo0sdoPr1UjLYIL9QofCtgzcDcEAHGTZc5KOiTTy-ASs

Java8에 interface에서 default 메서드가 추가 된다면 어떻게 될까?

public abstract class AbstractTest {
    abstract void printString();
}

public interface InterFaceTest {
    default void printString(){
        System.out.println("PrintInterface");
    }
}

이럴 경우 AbstractTest를 상속받고 InterFaceTest를 구현하면 어떻게 될까?
public class Test extends AbstractTest implements InterFaceTest {

}

이럴 경우에는 IDE에서는 아래와 같은 충돌로 에러를 검출한다.
Class 'Test' must either be declared abstract or implement abstract method 'printString()' in 'AbstractSimple'
이럴 경우에는 아래와 같이 AbstractClass의 메서드를 구현한다.
public class Test extends AbstractTest implements InterFaceTest {
    @Override
    public void printString() {

    }
}

정리
- default Method 와 조상 클래스의 메서드 간의 충돌 : 인터페이스에서 디폴트 메서드는 무시된다.
- 여러 인터페이스의 디폴트 메서드간의 충돌 : 인터페이스를 구현한 클래스에서 디폴트 메서드를 오버라이딩 해야한다.

'ProgramLanguage > Java' 카테고리의 다른 글

JNDI (Java Naming and Directory Interface)  (0) 2020.08.30
JVM 메모리 구조 (1)  (0) 2020.03.22
JVM 튜닝  (0) 2020.02.10
Lambda_Expression(2)  (0) 2020.01.25
객체 지향 설계 5원칙 - SOLID  (0) 2019.12.26

+ Recent posts