GroupByKey, ReduceByKey 이전의 스파크를 다루는 기술 이라는 책의 4번째 챕터에 나왔었는데 이런 기능이 있구나 하고 넘어갔었다.
groupByKey 변화 연산자는 동일한 키를 가진 모든 요소를 단일 키-값 쌍으로 모은 Pair RDD를 반환한다.
우선 결론은 GroupByKey는 각 키의 모든 값을 메모리로 가져오기 때문에 이 메서드를 사용할 때는 메모리 리소스를 과다하게 사용하지 않도록 주의해야한다.
한꺼번에 그루핑 할 필요가 없으면 aggregateByKey나 reduceByKey, foldByKey를 사용하는 것이 좋다. 고 설명 하고 있다.
PairRDDFunctions.scala 를 살펴 보면 다음과 같다.
groupByKey ()
- Group the values for each key in the RDD into a single sequence. Allows controlling the
- partitioning of the resulting key-value pair RDD by passing a Partitioner.
- The ordering of elements within each group is not guaranteed, and may even differ
- each time the resulting RDD is evaluated.
- @note This operation may be very expensive. If you are grouping in order to perform an
- aggregation (such as a sum or average) over each key, using
- or
will provide much better performance.
- @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
- key in memory. If a key has too many values, it can result in an
- /
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
MapReduce의 "combiner"와 마찬가지로 결과를 리듀서로 보내기 전에 각 맵퍼에서 로컬로 병합을 수행합니다.
기존 파티 셔너 병렬 처리 수준으로 출력이 해시 파티션됩니다.
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
아래 reduceByKey는 위의 메소드 안의 reduceByKey(defaultPartitioner(self),func)설명
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
테스트 예제 코드
전체 코드는 다음과 같다.
import org.apache.spark.sql.SparkSession
object Main{
def main(args: Array[String]): Unit = {
val sparkSession = SparkUtil.getInstance();
//예제 데이터 만들기
val studentRDD = sparkSession.sparkContext.parallelize(Array(
("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91),
("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62),
("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78),
("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91),
("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65),
("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86),
("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64),
("Juan", "Biology", 60)), 3);
//RDD에서의 groupByData
val groupBydata = studentRDD
.groupBy((name)=> name._1);
/* groupBydata 결과
(Tina,CompactBuffer((Tina,Maths,78), (Tina,Physics,73), (Tina,Chemistry,68), (Tina,Biology,87)))
(Thomas,CompactBuffer((Thomas,Maths,87), (Thomas,Physics,93), (Thomas,Chemistry,91), (Thomas,Biology,74)))
(Jackeline,CompactBuffer((Jackeline,Maths,86), (Jackeline,Physics,62), (Jackeline,Chemistry,75), (Jackeline,Biology,83)))
(Joseph,CompactBuffer((Joseph,Maths,83), (Joseph,Physics,74), (Joseph,Chemistry,91), (Joseph,Biology,82)))
(Juan,CompactBuffer((Juan,Maths,63), (Juan,Physics,69), (Juan,Chemistry,64), (Juan,Biology,60)))
(Jimmy,CompactBuffer((Jimmy,Maths,69), (Jimmy,Physics,62), (Jimmy,Chemistry,97), (Jimmy,Biology,80)))
(Cory,CompactBuffer((Cory,Maths,56), (Cory,Physics,65), (Cory,Chemistry,71), (Cory,Biology,68)))
// Pair RDD!!
// pairRDD 로 변경하기
val pairStudentSubjectScore =>(student._1,student._3));
//reduceByKey와 groupByKey 비교
val reduceByData = pairStudentSubjectScore.reduceByKey((value1,value2)=>value1+value2);
/* reduceByData 결과
val groupByData = pairStudentSubjectScore.groupByKey();
/* groupByData 결과
(Tina,CompactBuffer(78, 73, 68, 87))
(Juan,CompactBuffer(63, 69, 64, 60))
(Jimmy,CompactBuffer(69, 62, 97, 80))
(Cory,CompactBuffer(56, 65, 71, 68))
(Thomas,CompactBuffer(87, 93, 91, 74))
(Jackeline,CompactBuffer(86, 62, 75, 83))
(Joseph,CompactBuffer(83, 74, 91, 82))
val groupByDataSum =>( value._1,value._2.sum))
/* groupByDataSum 결과
ReduceByKey는 셔플 전에 데이터를 결합하므로 GroupByKey에 비해 네트워크를 통해 전송되는 데이터의 양을 최소화합니다. GroupByKey는 대부분 디스크 부족 문제의 원인입니다 .