SparkSession, SparkContext, SQLContext, HiveContext 너무 많다 .


Spark2.0이후 부터는 아래와 같은 계층 구조를 가진다.

SparkSession > SparkContext > HiveContext > SQLContext


SparkSession은 spark2.X 이후부터 지원한다.


SparkSession

Spark 2.0 이후부터는 SparkSession을 사용해서 Spark Dataset, DataFrame API 를 시작할수 있다.

또한 SparkContext에서 사용 가능한 모든 기능은 SparkSession에서도 사용이 가능하다.

SparkContext를 선호한다면 SparkContext를 계속 사용 할 수 있다.

builder를 사용해서 REPL, notebooks를 만들어낼수 있으며, 기존에 존재하는 세션을 사용할 수 있다.

SparkSession.builder
  .master("local")
  .appName("Word Count")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/SparkSession.html#newSession--


SparkContext

SparkSession으로 시작을 해서 SparkContext( )를 하고 싶다면 아래와 같이

SparkSession에서 sparkContext() 메서드를 사용하면 SparkContext를 사용할 수 있다.

SparkConext는 Spark 클러스터에 대한 연결을 나타내며 해당 클러스터에서 RDD, broadcast, accumulator 등의 변수를 사용하여 사용이 가능하다.

JVM 당 하나의 SparkContext만 활성화 될 수 있다.


SparkContext의 객체를 구성

SparkContext()시스템 속성에서 설정을로드하는 SparkContext를 만듭니다 (예 : ./bin/spark-submit으로 시작할 때).
SparkContext(SparkConf config)
SparkContext(String master, String appName, SparkConf conf)일반적인 Spark 속성을 직접 설정할 수있는 대체 생성자
SparkContext(String master, String appName, String sparkHome, scala.collection.Seq jars, scala.collection.Map environment)일반적인 Spark 속성을 직접 설정할 수있는 대체 생성자


https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/SparkContext.html


Spark SQLContext

org.apache.spark.sql.SQLContext

Spark 1버전에서 는 SQLContext 는 구조적 api를 사용할때 만들어서 사용할때 사용했다.

그러나 Spark 2 부터는 SparkSession에서 접근이 바로 가능하도록 변경되었다.

//spark 1.x
SQLContext.sql(String sqlText)
//spark 2.x
SparkSession.sql(String sqlText)

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/SQLContext.html


Spark HiveContext

SparkHiveContext는 가장 아래 위치

Spark SQL 실행 엔진 하나의 인스턴스 이며, hive에 저장된 데이터와 통합됨.

org.apache.spark.sql.hive
Class HiveContext
Object
  org.apache.spark.sql.SQLContext
    org.apache.spark.sql.hive.HiveContext

Hive 에 대한 구성은 conf에 hive-site.xml에서 읽기가 가능하다.

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/hive/HiveContext.html


참고

https://gankrin.org/sparksession-vs-sparkcontext-vs-sqlcontext-vs-hivecontext/

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/SparkSession.html#newSession--

'BackEnd > Spark' 카테고리의 다른 글

[Spark] GroupByKey, ReduceByKey 차이 (PairRDD)  (0) 2020.03.18
Spark Directed acyclic graph, lineage  (0) 2020.03.17
SparkSession newSession() 테스트  (0) 2020.03.15
SparkSQL Casting 타입 변환, 오류 검출  (0) 2020.03.11
Spark SQL API  (0) 2020.03.09

이전에 글인 https://deviscreen.tistory.com/78?category=789556 에서 SparkSession을 사용하여 멀티 세션을 설명 했는데 더 자세한 테스트를 진행했다.


추가 테스트

SparkSession을 처음 생성하고 이후에 newSession을 만들었다.

디버깅으로 session는 어떤 값들을 가지고 있는지 테스트 목적

SparkSession session = new SparkUtil().getSparkSession();
SparkSession newSession= session.newSession();

디버깅하면서 코드를 확인한 결과 session과 newSession는 다른 주소 값을 가지고 있다.

session = {SparkSession@4254} 
newSession = {SparkSession@4772} 

session의 값

session = {SparkSession@4254} 
 sparkContext = {SparkContext@4255} 
 existingSharedState = {None$@4256} "None"
 parentSessionState = {None$@4256} "None"
 extensions = {SparkSessionExtensions@4257} 
 creationSite = {CallSite@4258} "CallSite(getOrCreate at SparkUtil.java:13,org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:957)\nSparkUtil.<init>(SparkUtil.java:13)\nSparkMain.main(SparkMain.java:6))"
 sharedState = {SharedState@4767} 
 initialSessionOptions = {HashMap@4259} "HashMap" size = 4
 sessionState = {SessionState@4768} 
 sqlContext = {SQLContext@4261} 

newSession 의 값

newSession = {SparkSession@4772} 
 sparkContext = {SparkContext@4255} 
 existingSharedState = {Some@4774} "Some(org.apache.spark.sql.internal.SharedState@2c1d57bc)"
 parentSessionState = {None$@4256} "None"
 extensions = {SparkSessionExtensions@4257} 
 creationSite = {CallSite@4775} "CallSite(newSession at SparkMain.java:10,org.apache.spark.sql.SparkSession.newSession(SparkSession.scala:240)\nSparkMain.main(SparkMain.java:10))"
 sharedState = null
 initialSessionOptions = {HashMap@4776} "HashMap" size = 0
 sessionState = null
 sqlContext = {SQLContext@4777} 

SparkContext는 같은 값을 가지고 있지만 sqlContext는 다른 값을 가지고 있다.

이것을 보면 SparkSession의 newSession( ) 메서드는 개별의 독립된 sqlContext를 구성한다. 그리고 기본 SparkContext및 캐시된 데이터를 공유하면서 세션을 시작 할 수 있다.

'BackEnd > Spark' 카테고리의 다른 글

Spark Directed acyclic graph, lineage  (0) 2020.03.17
SparkSession, SparkContext, SQLContext, HiveContext  (0) 2020.03.15
SparkSQL Casting 타입 변환, 오류 검출  (0) 2020.03.11
Spark SQL API  (0) 2020.03.09
cleaned accumulator [number]  (0) 2020.03.09

sparkSQL을 사용하면 손쉽게 타입 변환을 할 수 있다.

문자열 -> INT

sparkSession.sql("select INT('23')").show(1);
결과 : 
+---------------+
|CAST(23 AS INT)|
+---------------+
|             23|
+---------------+

하지만 INT( '가나다라') 일 때는 오류를 반환하는 것이 아니라 null 로 반환을 한다.

null 로 반환하는 것이 아니라 오류를 반환 하고 싶으면 아래와 같이 사용하면된다.

SparkSession에 udf (User Define Function : 사용자 지정함수)를 만들고 등록한다.

//SparkSession에 UDF를 정의 하고 등록
sparkSession.udf().register("NumberCasting", (String str)->Integer.parseInt(str), DataTypes.IntegerType);

사용 방법은 기존의 sql 처럼 사용하면된다.

//사용 방법
sparkSession.sql("select NumberCasting('가나다라') ").show()

spark sql 의 udf를 사용하게 되면 null 값이 아닌 오류를 출력 할 수 있다.

...
Caused by: java.lang.NumberFormatException: For input string: "가나다라"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)

사용자정의 함수 만들어서 등록하기 !!

UDFRegistration.scala 에 등록되어있는 register 함수 설명

첫 번째 : 파라미터는 새롭게 만들 udf의 이름

두 번째 : value를 어떻게 변환할것인가의 사용자 정의 코드 (java 에서는 functional인터페이스 사용 가능)

세 번째 : 반환될 데이터 타입

def register(name: String, f: UDF1[_, _], returnType: DataType): Unit = {
    val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any)
    def builder(e: Seq[Expression]) = if (e.length == 1) {
      ScalaUDF(func, returnType, e, e.map(_ => true), udfName = Some(name))
    } else {
      throw new AnalysisException("Invalid number of arguments for function " + name +
        ". Expected: 1; Found: " + e.length)
    }
    functionRegistry.createOrReplaceTempFunction(name, builder)
  }

'BackEnd > Spark' 카테고리의 다른 글

SparkSession, SparkContext, SQLContext, HiveContext  (0) 2020.03.15
SparkSession newSession() 테스트  (0) 2020.03.15
Spark SQL API  (0) 2020.03.09
cleaned accumulator [number]  (0) 2020.03.09
SparkUI 에러 InjectionManagerFactory not found  (0) 2020.03.06

spark를 실행 하고 있으면 cleaned accumulator 숫자

INFO org.apache.spark.ContextCleaner - Cleaned accumulator 126 의 형태를 볼수 있다 무엇을 의미 하는 것 일까 ?

 

19:17:36.899 [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 126 19:17:36.899 [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 145 19:17:36.899 [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 152 19:17:36.899 [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 130 19:17:36.899 [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 144 19:17:36.899 [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 142

 

 

org.apache.spark.ContextCleaner 클래스 API 설명

 

RDD, 셔플 및 브로드캐스트 상태를 위한 비동기식 클리너

관련 object가 응용 프로그램 범위를 벗어날대, 처리 될 각 RDD, shuffleDependency 및 관심이 있는 브로드 캐스트에 대한 약한 참조는 유지됩니다.실제로 처리는 별도의 데몬 스레드에서 수행됨.

https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/ContextCleaner.html

 

stackoverflow 답변 중

 

ContextCleaner는 드라이버에서 실행 된다. SparkContext가 시작될때 작성되고 즉시 시작된다. RDD, 셔플 및 브로드캐스트 상태, accumulate를 정리하는 컨텍스트 클리너 스레드(keepCleaning 메소드 사용). context-cleaner-periodic-gc는 JVM 가비지 콜렉터를 요청함.

https://stackoverflow.com/questions/55452892/contextcleaner-cleaned-accumulator-what-does-it-mean-in-scala-spark

SparkUI 에러 InjectionManagerFactory not found

springframework 에서 spark를 조작하는데 spark UI에서 executor를 실행하면 아래와 같은 오류가 발생한다.

Caused by: java.lang.IllegalStateException: InjectionManagerFactory not found.

    at org.glassfish.jersey.internal.inject.Injections.lambda$lookupInjectionManagerFactory$0(Injections.java:74)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at org.glassfish.jersey.internal.inject.Injections.lookupInjectionManagerFactory(Injections.java:74)
    at org.glassfish.jersey.internal.inject.Injections.createInjectionManager(Injections.java:69)
    at org.glassfish.jersey.server.ApplicationHandler.<init>(ApplicationHandler.java:259)
    at org.glassfish.jersey.servlet.WebComponent.<init>(WebComponent.java:311)
    at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:154)
    at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:346)
    at javax.servlet.GenericServlet.init(GenericServlet.java:203)
    at org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:643)
    ... 23 common frames omitted

해결방법은 아래의 dependency를 추가해주면된다.

implementation 'org.glassfish.jersey.inject:jersey-hk2:2.30'

'BackEnd > Spark' 카테고리의 다른 글

Spark SQL API  (0) 2020.03.09
cleaned accumulator [number]  (0) 2020.03.09
[Spark] RDD Persistence 와 Caching  (0) 2020.03.05
스파크 SQL 사용하여 파일로 보내기  (0) 2020.03.02
java.io.InvalidClassException: org.apache.spark.rdd.RDD  (0) 2020.02.18


RDD의 persistence, Caching에 대해서 정리

Persistence와 Cache가 무엇인가가?

RDD 를 영구적으로 저장하는 기술로 결과를 즉시 저장해두었다가 이후에 필요하면 다시 불러와서 사용이 가능하다. 이것으로 컴퓨팅오버헤드를 줄일 수 있다.

이럴때 RDD를 저장하는 방법은 cache() 와 persist() 메서드가 있다.

cache() 메서드는 모든 메모리의 모든 RDD를 저장할 수 있다. 그리고 RDD를 메모리에 유지하고 병렬작업에서 효율적으로 사용 할 수 있다.

Cache()와 Persist()의 메서드의 차이는 ?

cache()와 persist()의 차이점 : cache()를 사용하면 기본 저장 장소가 MEMORY_ONRY 이고 persist를 하면 다양한 저장소에 수준을 저장할 수 있다.

RDD의 파티션이 손실되면 원래 생성한 변환작업을 통해 다시 복구한다.

Spark 에서 Persist가 필요한 이유

RDD 와 RDD 에서 호출하는 액션들에 대한 모든 의존성을 재연산하게 되는데, 호출하는 액션들에 대한 모든의존성을 재연산하게 된다.

이때 데이터를 여러번 스캔하는 반복알고리즘들에 대해서는 매우 무거운 작업일 수 있다.

persist 함으로서 반복적인 알고리즘과 메모리의 소비를 줄 일 수 있다.

여러번 반복 연산하는 것을 피하려면 스파크에 데이터 영속화(persist/persistence)를 요청할 수 있다.

RDD 영속화에 대한 요청을 하면 RDD를 계산한 노드들은 그 파티션들을 저장하고 있게 된다.

자바에서는 기본적으로 persist()가 데이터를 JVM 힙(heap) 에 직렬화되지 않는 객체 형태로 저장.

  




레벨  공간사용  CPU 사용시간 메모리에 저장 디스크에 저장 비고
MEMORY_ONLY  높음  낮음  예  아니오  
MEMORY_ONLY_SER    낮음 높음   아니오  
MEMORY_AND_DISK     높음 중간 일부  일부 메모리에 넣기에 데이터가 너무 많으면 디스크에 나눠 저장
MEMORY_AND_DISK_SER     낮음 높음 일부  일부 메모리에 넣기에 데이터가 너무 많으면 디스크에 나눠 저장.메모리에 직렬화된 형태로 저장


DISK_ONLY 낮음 높음 아니오 예

persist() 호출은 연산을 강제로 수행하지않는다.

메모리에 많은 데이터를 올리려고 시도하면 스파크는 LRU 캐시 정책에 따라 오래된 파티션들을 자동으로 버림.

예제 코드

import org.apache.spark.storage.StorageLevel

val result = input map(x => x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))



Dataset<Row> 또한 데이터가 아니라 변환작업들을 저장하고 있어 불러올 때 마다 값이 달라질 수도있는데, 이것을 persist 하면 cached 된 데이터를 사용하기 때문에 다시 계산할 필요없다.

https://data-flair.training/blogs/apache-spark-rdd-persistence-caching/

스파크 SQL 을 파일로 보내기
coalesce(1) 을 해주는 이유는 RDD 로 분산되어있는 파티션을 하나로 모아주기 위해서

 session.sql("select * from temp").coalesce(1).write().format("com.databriccks.spark.csv").option("header","true").csv("./data/morris.csv");

java.io.InvalidClassException: org.apache.spark.rdd.RDD

java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID 에러

19:58:43.380 [dag-scheduler-event-loop] INFO  o.a.spark.scheduler.DAGScheduler - ShuffleMapStage 0 (count at TaskProcessService.java:152) failed in 0.880 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.44.80, executor 1): java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID = -3328732449542231715, local class serialVersionUID = 4416556597546473068
        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1843)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1843)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

spark 를 사용하는 어딘가에서 spark version이 여러개라서 오류가 나는것

저 같은 경우에는 springboot에서 spark의 버전을 2.4.0 으로 설정이 되어있고, 서버의 spark 모듈은 2.4.5라서 발생했었음

'BackEnd > Spark' 카테고리의 다른 글

[Spark] RDD Persistence 와 Caching  (0) 2020.03.05
스파크 SQL 사용하여 파일로 보내기  (0) 2020.03.02
Spark History Server 실행 오류 시  (0) 2020.02.10
SparkSession  (0) 2020.02.06
RDD 영속화(캐싱)  (0) 2019.09.02

HistoryServer 가 실행이 되지 않고, 아래와 같은 로그가 출력이 된다면

Exception in thread "main" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:296)
        at org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala)
Caused by: java.io.FileNotFoundException: Log directory specified does not exist: file:/tmp/spark-events Did you configure the correct one through spark.history.fs.logDirectory?
        at org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$startPolling(FsHistoryProvider.scala:267)
        at org.apache.spark.deploy.history.FsHistoryProvider.initialize(FsHistoryProvider.scala:211)
        at org.apache.spark.deploy.history.FsHistoryProvider.<init>(FsHistoryProvider.scala:207)
        at org.apache.spark.deploy.history.FsHistoryProvider.<init>(FsHistoryProvider.scala:86)
        ... 6 more
Caused by: java.io.FileNotFoundException: File file:/tmp/spark-events does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
        at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
        at org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$startPolling(FsHistoryProvider.scala:257)
        ... 9 more

해결 spark-default.conf 에 아래와 같이spark.history.fs.logDirectory 설정을 추가 해준다

spark.eventLog.enabled          true
spark.eventLog.dir              file:/opt/spark-events
spark.history.fs.logDirectory   file:/opt/spark-events

https://stackoverflow.com/questions/44835026/how-to-enable-spark-history-server-for-standalone-cluster-non-hdfs-mode?rq=1

'BackEnd > Spark' 카테고리의 다른 글

스파크 SQL 사용하여 파일로 보내기  (0) 2020.03.02
java.io.InvalidClassException: org.apache.spark.rdd.RDD  (0) 2020.02.18
SparkSession  (0) 2020.02.06
RDD 영속화(캐싱)  (0) 2019.09.02
Spark BroadCast  (0) 2019.08.28

+ Recent posts