RDD연산자의 종류는 transformation 과 action둘로 나뉘는데

transformation 은 새로운 RDD 를 생성

action 은 RDD 의 연산자를 호출함

스파크에서 transformation , 과 action둘의 지연 실행

spark 에 대해서 lazy evaluation 개념이 중요한데, 처음에는 lazy 에 대한 이해를 하지 못 한채 그냥 그렇구나 했는데, 개념은 다음과 같다.

transformation의 지연 실행은 action 연산자를 호출하기 전까지는 transformation 연산자의 계산을 실제로 실행 하지 않는 것을 의미한다.
이는 RDD에 action연산자가 호출되면 스파크는 해당 RDD 의 계보를 살펴본 후, 이를 바탕으로 실행해야하는 연산 그래프를 작성해서 action 연산자를 계산한다.
결론은 transformation 연산자는 action 연산자를 호출했을때, 무슨 연산이 어떤 순서로 실행되어야 할지 알려주는 일종의 설계도 라고 할 수 있다.

책의 예제를 따라다 우연히 lazy evaluation 의 예제를 찾은 것같다.

scala> val lines = sc.textFile("/home/morris01/study/spark/data/client-ids.log")
lines: org.apache.spark.rdd.RDD[String] = /home/morris01/study/spark/data/client-ids.log MapPartitionsRDD[4] at textFile at <console>:24

scala> val idsStr = lines.map(line=>line.split(","))
idsStr: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:26

scala> idsStr.foreach(println)
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/morris01/study/spark/data/client-ids.log
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)

위의 실행 예제가 에러가 난이유는 filepath가 잘못되어서 나온 에러이다.

그러나 idsStr.foreach를 실행하기전까지는 순수히 진행이 되는 것 같았다. 하지면 foreach 라는 action을 수행을 하면서 이전의 RDD 의 계보를 살펴보다가 잘못되어서 에러가 발생한것같다

RDD 연산자

원본 RDD 의 각 요소를 변환한 후 변환된 요소로 새로운 RDD를 생성하는 map 변환 연산자

scala> val numbers = sc.parallelize(10 to 50 by 10)
numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> numbers.foreach(x=>println(x))
10
20
30
40
50

scala> val numberSquared = numbers.map(num=>num*num)
numberSquared: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at map at <console>:26

scala> numberSquared.foreach(x=>println(x))
100
400
900
1600
2500

scala> numberSquared.foreach(println)
100
400
900
1600

distinct, flatMap 연산자

예제 데이터는 물건을 구매한 ID 값을 가진 log 파일이다.

echo "15,16,20,20
77,80,94
94,98,16,31
31,15,20" > ~/client-ids.log

scala> val lines = sc.textFile("/home/morris01/study/spark/data/sparkinaction/client-ids.log")
lines: org.apache.spark.rdd.RDD[String] = /home/morris01/study/spark/data/sparkinaction/client-ids.log MapPartitionsRDD[7] at textFile at <console>:24

scala> val idsStr = lines.map(line=>line.split(","))
idsStr: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[8] at map at <console>:26

scala> idsStr.foreach(println)
[Ljava.lang.String;@77278a7d
[Ljava.lang.String;@6876c229
[Ljava.lang.String;@25f5ac40
[Ljava.lang.String;@2d06d673
/*
idsStr 에는 string 하나, 하나의 rdd 가생성되는걸로 예상했는데
string 배열을 가진 RDD 가 생성되었다.
*/
scala> idsStr.first
res5: Array[String] = Array(15, 16, 20, 20)

scala> idsStr.collect
res6: Array[Array[String]] = Array(Array(15, 16, 20, 20), Array(77, 80, 94), Array(94, 98, 16, 31), Array(31, 15, 20))
/*
collect 를 사용하여 새로운 배열을 생성 , RDD의 모든 요소를 이 배열에 모아서 반환
*/

이 배열을 단일 배열로 분해 하려면 flatMap을 사용하게된다.

flatMap은 RDD 모든 요소에 적용이 된다.

익명함수가 반환한 배열의 중첩구조를 한단계 제거하고 모든 배열의 요소를 단일 컬렌션으로 병합한다는것이 flatmap 과 map 의 다른 점이다.

scala 에 대한 지식중 TraversableOnce 에 대해서 꼭 알 필요가 있다.

이유는 flatMap의 시그니쳐는 다음과 같이 가지고 있기 때문이다.

def flatMap[U](f:(T)=>TraversableOnce[U]):RDD[U]

map으로 연산을 했던 것을 flatMap을 사용하게 되면 하나의 배열로 값을 불러올 수 있는 것을 확인 할 수 있다.

scala> val idsStr = lines.flatMap(line=>line.split(","))
idsStr: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26

scala> idsStr.collect
res1: Array[String] = Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)
/*
String 의 값을 Int 로 반환해주기 위해서는 _.toInt 메서드를 사용하면된다.
*/
scala> val idsInt = idsStr.map(_.toInt)
idsInt: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:28

scala> idsInt.collect
res2: Array[Int] = Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)

Distinct

구매 고객들의 아이디 값 들을 연산하기 쉽게 하나의 배열로 나타냈지만, 구매고객의 수를 구하려면 중복을 제거를 해주어야한다.

보통은 Scala의 Set 함수에 다시 넣을수도있겠지만, 간편하게 Distinct 를 사용하면 된다.

scala> val uniqueIds = idsInt.distinct
uniqueIds: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at distinct at <console>:30

scala> uniqueIds.collect
res4: Array[Int] = Array(15, 77, 16, 80, 98, 20, 31, 94)

scala> val finalCount = uniqueIds.count
finalCount: Long = 8

예제 파일 github : https://github.com/spark-in-action/first-edition/blob/master/ch02/scala/ch02-listings.scala

'BackEnd > Spark' 카테고리의 다른 글

Spark BroadCast  (0) 2019.08.28
SPARK 에서의 기본 행동(action) 연산자 및 변환(transformation)연산자(2)  (0) 2018.06.18
Spark(3) SparkContext-1  (0) 2018.05.16
Spark (2) 기본예제 및 scala  (0) 2018.05.15
SPARK(1)-환경 구축  (1) 2018.05.13

+ Recent posts