GroupByKey, ReduceByKey 이전의 스파크를 다루는 기술 이라는 책의 4번째 챕터에 나왔었는데 이런 기능이 있구나 하고 넘어갔었다.
groupByKey 변화 연산자는 동일한 키를 가진 모든 요소를 단일 키-값 쌍으로 모은 Pair RDD를 반환한다.
우선 결론은 GroupByKey는 각 키의 모든 값을 메모리로 가져오기 때문에 이 메서드를 사용할 때는 메모리 리소스를 과다하게 사용하지 않도록 주의해야한다.
한꺼번에 그루핑 할 필요가 없으면 aggregateByKey나 reduceByKey, foldByKey를 사용하는 것이 좋다. 고 설명 하고 있다.
PairRDDFunctions.scala 를 살펴 보면 다음과 같다.
groupByKey ()
매우 비싼 동작
그룹의 순서는 보장하지 않는다. 그리고 key는 메모리에 존재하고, 만약 키가 너무 많으면 OutofMemory가 발생한다.
**
- Group the values for each key in the RDD into a single sequence. Allows controlling the
- partitioning of the resulting key-value pair RDD by passing a Partitioner.
- The ordering of elements within each group is not guaranteed, and may even differ
- each time the resulting RDD is evaluated.
- @note This operation may be very expensive. If you are grouping in order to perform an
- aggregation (such as a sum or average) over each key, using
PairRDDFunctions.aggregateByKey
- or
PairRDDFunctions.reduceByKey
will provide much better performance. - @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
- key in memory. If a key has too many values, it can result in an
OutOfMemoryError
. - /
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
reduceByKey()
MapReduce의 "combiner"와 마찬가지로 결과를 리듀서로 보내기 전에 각 맵퍼에서 로컬로 병합을 수행합니다.
기존 파티 셔너 병렬 처리 수준으로 출력이 해시 파티션됩니다.
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
아래 reduceByKey는 위의 메소드 안의 reduceByKey(defaultPartitioner(self),func)설명
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
테스트 예제 코드
전체 코드는 다음과 같다.
import org.apache.spark.sql.SparkSession
object Main{
def main(args: Array[String]): Unit = {
val sparkSession = SparkUtil.getInstance();
//예제 데이터 만들기
val studentRDD = sparkSession.sparkContext.parallelize(Array(
("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91),
("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62),
("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78),
("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91),
("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65),
("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86),
("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64),
("Juan", "Biology", 60)), 3);
studentRDD.foreach(println);
//RDD에서의 groupByData
val groupBydata = studentRDD
.groupBy((name)=> name._1);
groupBydata.foreach(println)
/* groupBydata 결과
(Tina,CompactBuffer((Tina,Maths,78), (Tina,Physics,73), (Tina,Chemistry,68), (Tina,Biology,87)))
(Thomas,CompactBuffer((Thomas,Maths,87), (Thomas,Physics,93), (Thomas,Chemistry,91), (Thomas,Biology,74)))
(Jackeline,CompactBuffer((Jackeline,Maths,86), (Jackeline,Physics,62), (Jackeline,Chemistry,75), (Jackeline,Biology,83)))
(Joseph,CompactBuffer((Joseph,Maths,83), (Joseph,Physics,74), (Joseph,Chemistry,91), (Joseph,Biology,82)))
(Juan,CompactBuffer((Juan,Maths,63), (Juan,Physics,69), (Juan,Chemistry,64), (Juan,Biology,60)))
(Jimmy,CompactBuffer((Jimmy,Maths,69), (Jimmy,Physics,62), (Jimmy,Chemistry,97), (Jimmy,Biology,80)))
(Cory,CompactBuffer((Cory,Maths,56), (Cory,Physics,65), (Cory,Chemistry,71), (Cory,Biology,68)))
*/
// Pair RDD!!
// pairRDD 로 변경하기
val pairStudentSubjectScore = studentRDD.map((student)=>(student._1,student._3));
//reduceByKey와 groupByKey 비교
//reduceByData
val reduceByData = pairStudentSubjectScore.reduceByKey((value1,value2)=>value1+value2);
reduceByData.foreach(println);
/* reduceByData 결과
(Tina,306)
(Thomas,345)
(Jackeline,306)
(Joseph,330)
(Juan,256)
(Jimmy,308)
(Cory,260)
*/
//groupByData
val groupByData = pairStudentSubjectScore.groupByKey();
groupByData.foreach(println)
/* groupByData 결과
(Tina,CompactBuffer(78, 73, 68, 87))
(Juan,CompactBuffer(63, 69, 64, 60))
(Jimmy,CompactBuffer(69, 62, 97, 80))
(Cory,CompactBuffer(56, 65, 71, 68))
(Thomas,CompactBuffer(87, 93, 91, 74))
(Jackeline,CompactBuffer(86, 62, 75, 83))
(Joseph,CompactBuffer(83, 74, 91, 82))
*/
val groupByDataSum = groupByData.map(value=>( value._1,value._2.sum))
groupByDataSum.foreach(println)
/* groupByDataSum 결과
(Juan,256)
(Thomas,345)
(Jackeline,306)
(Joseph,330)
(Jimmy,308)
(Cory,260)
(Tina,306)
*/
sparkSession.close();
}
}
요약
ReduceByKey는 셔플 전에 데이터를 결합하므로 GroupByKey에 비해 네트워크를 통해 전송되는 데이터의 양을 최소화합니다. GroupByKey는 대부분 디스크 부족 문제의 원인입니다 .
참고
'BackEnd > Spark' 카테고리의 다른 글
[Spark]Apache Spark 란? 특징 , 왜 쓸까? (0) | 2020.03.22 |
---|---|
[Spark] 텍스트를 RDD로? Dataset, DataFrame으로 읽어오는법 (0) | 2020.03.22 |
Spark Directed acyclic graph, lineage (0) | 2020.03.17 |
SparkSession, SparkContext, SQLContext, HiveContext (0) | 2020.03.15 |
SparkSession newSession() 테스트 (0) | 2020.03.15 |