DAG : 방향성 비순환 그래프

방향성은 있으니 비순환인 그래프

Spark에서는 DAG를 사용하여 프로그램을 스케줄링 하고 최적화 한다.

하둡 MapReduce와 Spark 사이의 차이를 정리

우선 Spark 에서의 DAG는 vertices (노드) 와 edges(선) 으로 이루어져있다.

Edge들은 동작을 나타내는 transform이 있다.


DAG : https://helloino.tistory.com/94

Spark의 DAG를 통해서 사용자는 스테이지 에서 세부적으로 확장이 가능하다

각각의 스테이지로 구성되는 작업을 RDD의 파티션에 기초해서 병렬로 동일한 연산을 수행한다.

HDFS같은 경우에는 데이터를 읽고 작업을 나누는 map과 다시 결합하는 reduce작업을 거치고 다시 hdfs에 기록이 된다. 이러한 경우에는 hdfs의 메모리 또는 디스크 메모리가 낭비된다.

Spark에서는 연속계산단계인 DAG가 형성된다. 이렇게 하나의 스테이지처리가 끝나면 셔플링 데이터를 최소화 하기 위해 실행 계획을 최적화 한다.

DAG 스케줄러는 운영자를 작업 단계로 나눈다. 스테이지에는 입력 데이터의 파티션을 기반으로 작업을 진행한다.

Spark가 lazy evaluation으로 수행이 되어 stage별로 실행이 되는데 이유는 action이 시작되는 시점에서 transformation 끼리의 연계를 파악해 실행 계획의 최적화가 이루어진다. 여기서 최적화는 대부분 지역성(Locality)에 관한 것이다. 
 이전의 MapReduce 같은 경우에도 map과 reduce연산 중간에 동작하는 셔플은 데이터를 전달하는데 많은 비용이 든다. 이것을 줄이기 위해 많이 신경 써야했지만, Spark는 map filter 등 Narrow transformation같은 경우에는 해당 partition에서 실행되므로 효율적인 계획을 세워서 수행을 한다.

Lineage란 ??

여기 까지 하면 transformation 밖에 없기 때문에 만들어지지는 않고 lineage라는 형태로 rdd가 형성된다. 이것이 Lineage : logical plan 이다.

모든 상위의 RDD가 파생된 RDD에 연결되는 방법을 나타내는 그래프

RDD . toDebugString 메서드로 알 수 있다.

예제

scala> val rdd1 = sc.parallelize(1 to 9)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = rdd1.filter(x => x==5)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:25

scala> val rdd3 = rdd2.map(x=>(x,x))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:25

scala> rdd3.toDebugString
res1: String =
(8) MapPartitionsRDD[2] at map at <console>:25 []
 |  MapPartitionsRDD[1] at filter at <console>:25 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:24


What is difference between lineage and DAG ?

dag는 physical plan 이다 . 스테이지는 또다른 어떤 스테이지에 의존하는지 에 대한 실질적인 처리 를 나타낸다.

실질적으로 Spark가 처리하는 물리적인 계획은 DAG에 그려진다.


예제) https://www.youtube.com/watch?v=NGOD7JN6azM

Spark DAG: https://data-flair.training/blogs/dag-in-apache-spark/

Spark 이점 : https://brocess.tistory.com/104

SparkSession, SparkContext, SQLContext, HiveContext 너무 많다 .


Spark2.0이후 부터는 아래와 같은 계층 구조를 가진다.

SparkSession > SparkContext > HiveContext > SQLContext


SparkSession은 spark2.X 이후부터 지원한다.


SparkSession

Spark 2.0 이후부터는 SparkSession을 사용해서 Spark Dataset, DataFrame API 를 시작할수 있다.

또한 SparkContext에서 사용 가능한 모든 기능은 SparkSession에서도 사용이 가능하다.

SparkContext를 선호한다면 SparkContext를 계속 사용 할 수 있다.

builder를 사용해서 REPL, notebooks를 만들어낼수 있으며, 기존에 존재하는 세션을 사용할 수 있다.

SparkSession.builder
  .master("local")
  .appName("Word Count")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/SparkSession.html#newSession--


SparkContext

SparkSession으로 시작을 해서 SparkContext( )를 하고 싶다면 아래와 같이

SparkSession에서 sparkContext() 메서드를 사용하면 SparkContext를 사용할 수 있다.

SparkConext는 Spark 클러스터에 대한 연결을 나타내며 해당 클러스터에서 RDD, broadcast, accumulator 등의 변수를 사용하여 사용이 가능하다.

JVM 당 하나의 SparkContext만 활성화 될 수 있다.


SparkContext의 객체를 구성

SparkContext()시스템 속성에서 설정을로드하는 SparkContext를 만듭니다 (예 : ./bin/spark-submit으로 시작할 때).
SparkContext(SparkConf config)
SparkContext(String master, String appName, SparkConf conf)일반적인 Spark 속성을 직접 설정할 수있는 대체 생성자
SparkContext(String master, String appName, String sparkHome, scala.collection.Seq jars, scala.collection.Map environment)일반적인 Spark 속성을 직접 설정할 수있는 대체 생성자


https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/SparkContext.html


Spark SQLContext

org.apache.spark.sql.SQLContext

Spark 1버전에서 는 SQLContext 는 구조적 api를 사용할때 만들어서 사용할때 사용했다.

그러나 Spark 2 부터는 SparkSession에서 접근이 바로 가능하도록 변경되었다.

//spark 1.x
SQLContext.sql(String sqlText)
//spark 2.x
SparkSession.sql(String sqlText)

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/SQLContext.html


Spark HiveContext

SparkHiveContext는 가장 아래 위치

Spark SQL 실행 엔진 하나의 인스턴스 이며, hive에 저장된 데이터와 통합됨.

org.apache.spark.sql.hive
Class HiveContext
Object
  org.apache.spark.sql.SQLContext
    org.apache.spark.sql.hive.HiveContext

Hive 에 대한 구성은 conf에 hive-site.xml에서 읽기가 가능하다.

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/hive/HiveContext.html


참고

https://gankrin.org/sparksession-vs-sparkcontext-vs-sqlcontext-vs-hivecontext/

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/SparkSession.html#newSession--

'BackEnd > Spark' 카테고리의 다른 글

[Spark] GroupByKey, ReduceByKey 차이 (PairRDD)  (0) 2020.03.18
Spark Directed acyclic graph, lineage  (0) 2020.03.17
SparkSession newSession() 테스트  (0) 2020.03.15
SparkSQL Casting 타입 변환, 오류 검출  (0) 2020.03.11
Spark SQL API  (0) 2020.03.09

이전에 글인 https://deviscreen.tistory.com/78?category=789556 에서 SparkSession을 사용하여 멀티 세션을 설명 했는데 더 자세한 테스트를 진행했다.


추가 테스트

SparkSession을 처음 생성하고 이후에 newSession을 만들었다.

디버깅으로 session는 어떤 값들을 가지고 있는지 테스트 목적

SparkSession session = new SparkUtil().getSparkSession();
SparkSession newSession= session.newSession();

디버깅하면서 코드를 확인한 결과 session과 newSession는 다른 주소 값을 가지고 있다.

session = {SparkSession@4254} 
newSession = {SparkSession@4772} 

session의 값

session = {SparkSession@4254} 
 sparkContext = {SparkContext@4255} 
 existingSharedState = {None$@4256} "None"
 parentSessionState = {None$@4256} "None"
 extensions = {SparkSessionExtensions@4257} 
 creationSite = {CallSite@4258} "CallSite(getOrCreate at SparkUtil.java:13,org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:957)\nSparkUtil.<init>(SparkUtil.java:13)\nSparkMain.main(SparkMain.java:6))"
 sharedState = {SharedState@4767} 
 initialSessionOptions = {HashMap@4259} "HashMap" size = 4
 sessionState = {SessionState@4768} 
 sqlContext = {SQLContext@4261} 

newSession 의 값

newSession = {SparkSession@4772} 
 sparkContext = {SparkContext@4255} 
 existingSharedState = {Some@4774} "Some(org.apache.spark.sql.internal.SharedState@2c1d57bc)"
 parentSessionState = {None$@4256} "None"
 extensions = {SparkSessionExtensions@4257} 
 creationSite = {CallSite@4775} "CallSite(newSession at SparkMain.java:10,org.apache.spark.sql.SparkSession.newSession(SparkSession.scala:240)\nSparkMain.main(SparkMain.java:10))"
 sharedState = null
 initialSessionOptions = {HashMap@4776} "HashMap" size = 0
 sessionState = null
 sqlContext = {SQLContext@4777} 

SparkContext는 같은 값을 가지고 있지만 sqlContext는 다른 값을 가지고 있다.

이것을 보면 SparkSession의 newSession( ) 메서드는 개별의 독립된 sqlContext를 구성한다. 그리고 기본 SparkContext및 캐시된 데이터를 공유하면서 세션을 시작 할 수 있다.

'BackEnd > Spark' 카테고리의 다른 글

Spark Directed acyclic graph, lineage  (0) 2020.03.17
SparkSession, SparkContext, SQLContext, HiveContext  (0) 2020.03.15
SparkSQL Casting 타입 변환, 오류 검출  (0) 2020.03.11
Spark SQL API  (0) 2020.03.09
cleaned accumulator [number]  (0) 2020.03.09

+ Recent posts