RDD연산자의 종류는 transformation 과 action둘로 나뉘는데
transformation 은 새로운 RDD 를 생성
action 은 RDD 의 연산자를 호출함
스파크에서 transformation , 과 action둘의 지연 실행
spark 에 대해서 lazy evaluation 개념이 중요한데, 처음에는 lazy 에 대한 이해를 하지 못 한채 그냥 그렇구나 했는데, 개념은 다음과 같다.
transformation의 지연 실행은 action 연산자를 호출하기 전까지는 transformation 연산자의 계산을 실제로 실행 하지 않는 것을 의미한다.
이는 RDD에 action연산자가 호출되면 스파크는 해당 RDD 의 계보를 살펴본 후, 이를 바탕으로 실행해야하는 연산 그래프를 작성해서 action 연산자를 계산한다.
결론은 transformation 연산자는 action 연산자를 호출했을때, 무슨 연산이 어떤 순서로 실행되어야 할지 알려주는 일종의 설계도 라고 할 수 있다.
책의 예제를 따라다 우연히 lazy evaluation 의 예제를 찾은 것같다.
scala> val lines = sc.textFile("/home/morris01/study/spark/data/client-ids.log")
lines: org.apache.spark.rdd.RDD[String] = /home/morris01/study/spark/data/client-ids.log MapPartitionsRDD[4] at textFile at <console>:24
scala> val idsStr = lines.map(line=>line.split(","))
idsStr: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:26
scala> idsStr.foreach(println)
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/morris01/study/spark/data/client-ids.log
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
위의 실행 예제가 에러가 난이유는 filepath가 잘못되어서 나온 에러이다.
그러나 idsStr.foreach를 실행하기전까지는 순수히 진행이 되는 것 같았다. 하지면 foreach 라는 action을 수행을 하면서 이전의 RDD 의 계보를 살펴보다가 잘못되어서 에러가 발생한것같다
RDD 연산자
원본 RDD 의 각 요소를 변환한 후 변환된 요소로 새로운 RDD를 생성하는 map 변환 연산자
scala> val numbers = sc.parallelize(10 to 50 by 10)
numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> numbers.foreach(x=>println(x))
10
20
30
40
50
scala> val numberSquared = numbers.map(num=>num*num)
numberSquared: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at map at <console>:26
scala> numberSquared.foreach(x=>println(x))
100
400
900
1600
2500
scala> numberSquared.foreach(println)
100
400
900
1600
distinct, flatMap 연산자
예제 데이터는 물건을 구매한 ID 값을 가진 log 파일이다.
echo "15,16,20,20
77,80,94
94,98,16,31
31,15,20" > ~/client-ids.log
scala> val lines = sc.textFile("/home/morris01/study/spark/data/sparkinaction/client-ids.log")
lines: org.apache.spark.rdd.RDD[String] = /home/morris01/study/spark/data/sparkinaction/client-ids.log MapPartitionsRDD[7] at textFile at <console>:24
scala> val idsStr = lines.map(line=>line.split(","))
idsStr: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[8] at map at <console>:26
scala> idsStr.foreach(println)
[Ljava.lang.String;@77278a7d
[Ljava.lang.String;@6876c229
[Ljava.lang.String;@25f5ac40
[Ljava.lang.String;@2d06d673
/*
idsStr 에는 string 하나, 하나의 rdd 가생성되는걸로 예상했는데
string 배열을 가진 RDD 가 생성되었다.
*/
scala> idsStr.first
res5: Array[String] = Array(15, 16, 20, 20)
scala> idsStr.collect
res6: Array[Array[String]] = Array(Array(15, 16, 20, 20), Array(77, 80, 94), Array(94, 98, 16, 31), Array(31, 15, 20))
/*
collect 를 사용하여 새로운 배열을 생성 , RDD의 모든 요소를 이 배열에 모아서 반환
*/
이 배열을 단일 배열로 분해 하려면 flatMap을 사용하게된다.
flatMap은 RDD 모든 요소에 적용이 된다.
익명함수가 반환한 배열의 중첩구조를 한단계 제거하고 모든 배열의 요소를 단일 컬렌션으로 병합한다는것이 flatmap 과 map 의 다른 점이다.
scala 에 대한 지식중 TraversableOnce 에 대해서 꼭 알 필요가 있다.
이유는 flatMap의 시그니쳐는 다음과 같이 가지고 있기 때문이다.
def flatMap[U](f:(T)=>TraversableOnce[U]):RDD[U]
map으로 연산을 했던 것을 flatMap을 사용하게 되면 하나의 배열로 값을 불러올 수 있는 것을 확인 할 수 있다.
scala> val idsStr = lines.flatMap(line=>line.split(","))
idsStr: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26
scala> idsStr.collect
res1: Array[String] = Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)
/*
String 의 값을 Int 로 반환해주기 위해서는 _.toInt 메서드를 사용하면된다.
*/
scala> val idsInt = idsStr.map(_.toInt)
idsInt: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:28
scala> idsInt.collect
res2: Array[Int] = Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)
Distinct
구매 고객들의 아이디 값 들을 연산하기 쉽게 하나의 배열로 나타냈지만, 구매고객의 수를 구하려면 중복을 제거를 해주어야한다.
보통은 Scala의 Set 함수에 다시 넣을수도있겠지만, 간편하게 Distinct 를 사용하면 된다.
scala> val uniqueIds = idsInt.distinct
uniqueIds: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at distinct at <console>:30
scala> uniqueIds.collect
res4: Array[Int] = Array(15, 77, 16, 80, 98, 20, 31, 94)
scala> val finalCount = uniqueIds.count
finalCount: Long = 8
예제 파일 github : https://github.com/spark-in-action/first-edition/blob/master/ch02/scala/ch02-listings.scala
'BackEnd > Spark' 카테고리의 다른 글
Spark BroadCast (0) | 2019.08.28 |
---|---|
SPARK 에서의 기본 행동(action) 연산자 및 변환(transformation)연산자(2) (0) | 2018.06.18 |
Spark(3) SparkContext-1 (0) | 2018.05.16 |
Spark (2) 기본예제 및 scala (0) | 2018.05.15 |
SPARK(1)-환경 구축 (1) | 2018.05.13 |