Spark Jersey dependency 충돌

이슈 : 2020-03-23 16:21:03.349 [SparkUI-28] WARN o.s.jetty.servlet.ServletHandler - Error for /api/v1/applications
java.lang.NoSuchFieldError: INCLUDE_ALL

Jersey 라이브러리와 충돌이 나서 발생했던것

2020-03-23 16:21:03.349 [SparkUI-28] WARN  o.s.jetty.servlet.ServletHandler - Error for /api/v1/applications
java.lang.NoSuchFieldError: INCLUDE_ALL
    at org.glassfish.jersey.server.ResourceConfig$State.<init>(ResourceConfig.java:114)
    at org.glassfish.jersey.server.ResourceConfig.<init>(ResourceConfig.java:356)
    at org.glassfish.jersey.servlet.WebComponent.createResourceConfig(WebComponent.java:578)
    at org.glassfish.jersey.servlet.WebComponent.<init>(WebComponent.java:356)
    at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177)
    at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369)
    at javax.servlet.GenericServlet.init(GenericServlet.java:244)
    at org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:643)
    at org.spark_project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:499)
    at org.spark_project.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:791)
    at org.spark_project.jetty.servlet.ServletHolder.prepare(ServletHolder.java:776)
    at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:580)
    at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
    at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:513)
    at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
    at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
    at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
    at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
    at org.spark_project.jetty.server.Server.handle(Server.java:539)
    at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:333)
    at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
    at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
    at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:108)
    at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
    at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
    at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
    at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
    at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
    at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
    at java.lang.Thread.run(Thread.java:748)

해결 방법

기존의 build.gradle에 Jetty를 별도로추가해서 충돌이 일어나 발생하는것 같다.

예전 gradle.build

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    implementation 'org.apache.spark:spark-core_2.11:2.4.5'
    implementation 'org.apache.spark:spark-sql_2.11:2.4.5'
    implementation 'org.apache.spark:spark-hive_2.11:2.4.5'
    implementation 'org.codehaus.janino:janino:3.0.8'
    implementation 'org.slf4j:integration:1.7.29'
    implementation 'ch.qos.logback:logback-classic:1.2.3'
    implementation 'org.postgresql:postgresql:42.2.10.jre7'
    //요부분 jersey
    implementation 'org.glassfish.jersey.inject:jersey-hk2:2.30'

    implementation 'com.google.code.gson:gson:2.8.6'
    implementation 'org.projectlombok:lombok:1.18.10'
    annotationProcessor 'org.projectlombok:lombok:1.18.10'
}

해결한 gradle.build

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    implementation 'org.apache.spark:spark-core_2.11:2.4.5'
    implementation 'org.apache.spark:spark-sql_2.11:2.4.5'
    implementation 'org.apache.spark:spark-hive_2.11:2.4.5'
    implementation 'org.codehaus.janino:janino:3.0.8'

    implementation 'org.slf4j:integration:1.7.29'
    implementation 'ch.qos.logback:logback-classic:1.2.3'
    compile 'org.postgresql:postgresql:42.2.10.jre7'

    implementation 'com.google.code.gson:gson:2.8.6'
    implementation 'org.projectlombok:lombok:1.18.10'
    annotationProcessor 'org.projectlombok:lombok:1.18.10'
}

log 로 살펴본 Spark 동작 과정

spark가 실행되는 과정을 로그를 통해서 살펴 보는것도 의미가 있다고 생각해서 진행하고자 합니다.

시간 날때마다 정리를 해서 업로드 할 예정입니다.

코드는 아래와 같다.

val sparkSession = SparkUtil.getInstance();

//Load RDD
val readme = sparkSession.sparkContext.textFile("./data/README.md");
readme.toDebugString;
println(readme.count());

sparkSession.stop()

아래는 위 코드의 SparkLog 입니다.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/03/22 16:50:39 INFO SparkContext: Running Spark version 2.4.3
20/03/22 16:50:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/03/22 16:50:40 INFO SparkContext: Submitted application: Spark Basic Test
20/03/22 16:50:40 INFO SecurityManager: Changing view acls to: daeyunkim
20/03/22 16:50:40 INFO SecurityManager: Changing modify acls to: daeyunkim
20/03/22 16:50:40 INFO SecurityManager: Changing view acls groups to: 
20/03/22 16:50:40 INFO SecurityManager: Changing modify acls groups to: 
20/03/22 16:50:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(daeyunkim); groups with view permissions: Set(); users  with modify permissions: Set(daeyunkim); groups with modify permissions: Set()
20/03/22 16:50:40 INFO Utils: Successfully started service 'sparkDriver' on port 52540.
20/03/22 16:50:40 INFO SparkEnv: Registering MapOutputTracker
20/03/22 16:50:40 INFO SparkEnv: Registering BlockManagerMaster
20/03/22 16:50:40 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/03/22 16:50:40 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/03/22 16:50:40 INFO DiskBlockManager: Created local directory at /private/var/folders/qx/84bp85pn2y5gn6_96k267psc0000gn/T/blockmgr-fa3cf378-4049-4c69-8c24-7eaddb7af53e
20/03/22 16:50:40 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
20/03/22 16:50:40 INFO SparkEnv: Registering OutputCommitCoordinator
20/03/22 16:50:40 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/03/22 16:50:40 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.30.1.58:4040
20/03/22 16:50:40 INFO Executor: Starting executor ID driver on host localhost
20/03/22 16:50:40 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52541.
20/03/22 16:50:40 INFO NettyBlockTransferService: Server created on 172.30.1.58:52541
20/03/22 16:50:40 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/03/22 16:50:41 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.30.1.58, 52541, None)
20/03/22 16:50:41 INFO BlockManagerMasterEndpoint: Registering block manager 172.30.1.58:52541 with 2004.6 MB RAM, BlockManagerId(driver, 172.30.1.58, 52541, None)
20/03/22 16:50:41 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.30.1.58, 52541, None)
20/03/22 16:50:41 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.30.1.58, 52541, None)
20/03/22 16:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 214.6 KB, free 2004.4 MB)
20/03/22 16:50:42 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.4 KB, free 2004.4 MB)
20/03/22 16:50:42 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.30.1.58:52541 (size: 20.4 KB, free: 2004.6 MB)
20/03/22 16:50:42 INFO SparkContext: Created broadcast 0 from textFile at TestInternalRDD.scala:12
20/03/22 16:50:42 INFO FileInputFormat: Total input paths to process : 1
20/03/22 16:50:42 INFO SparkContext: Starting job: count at TestInternalRDD.scala:14
20/03/22 16:50:42 INFO DAGScheduler: Got job 0 (count at TestInternalRDD.scala:14) with 2 output partitions
20/03/22 16:50:42 INFO DAGScheduler: Final stage: ResultStage 0 (count at TestInternalRDD.scala:14)
20/03/22 16:50:42 INFO DAGScheduler: Parents of final stage: List()
20/03/22 16:50:42 INFO DAGScheduler: Missing parents: List()
20/03/22 16:50:42 INFO DAGScheduler: Submitting ResultStage 0 (./data/README.md MapPartitionsRDD[1] at textFile at TestInternalRDD.scala:12), which has no missing parents
20/03/22 16:50:42 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.3 KB, free 2004.4 MB)
20/03/22 16:50:42 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2023.0 B, free 2004.4 MB)
20/03/22 16:50:42 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.30.1.58:52541 (size: 2023.0 B, free: 2004.6 MB)
20/03/22 16:50:42 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1161
20/03/22 16:50:42 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (./data/README.md MapPartitionsRDD[1] at textFile at TestInternalRDD.scala:12) (first 15 tasks are for partitions Vector(0, 1))
20/03/22 16:50:42 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
20/03/22 16:50:42 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7932 bytes)
20/03/22 16:50:42 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7932 bytes)
20/03/22 16:50:42 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
20/03/22 16:50:42 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
20/03/22 16:50:42 INFO HadoopRDD: Input split: file:/Users/daeyunkim/Documents/SparkCode/BasicSparkScala/data/README.md:0+2243
20/03/22 16:50:42 INFO HadoopRDD: Input split: file:/Users/daeyunkim/Documents/SparkCode/BasicSparkScala/data/README.md:2243+2244
20/03/22 16:50:42 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 875 bytes result sent to driver
20/03/22 16:50:42 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 875 bytes result sent to driver
20/03/22 16:50:42 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 141 ms on localhost (executor driver) (1/2)
20/03/22 16:50:42 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 122 ms on localhost (executor driver) (2/2)
20/03/22 16:50:42 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
20/03/22 16:50:42 INFO DAGScheduler: ResultStage 0 (count at TestInternalRDD.scala:14) finished in 0.238 s
20/03/22 16:50:42 INFO DAGScheduler: Job 0 finished: count at TestInternalRDD.scala:14, took 0.316393 s
108
20/03/22 16:50:42 INFO SparkUI: Stopped Spark web UI at http://172.30.1.58:4040
20/03/22 16:50:42 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/03/22 16:50:42 INFO MemoryStore: MemoryStore cleared
20/03/22 16:50:42 INFO BlockManager: BlockManager stopped
20/03/22 16:50:42 INFO BlockManagerMaster: BlockManagerMaster stopped
20/03/22 16:50:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/03/22 16:50:42 INFO SparkContext: Successfully stopped SparkContext
20/03/22 16:50:42 INFO ShutdownHookManager: Shutdown hook called
20/03/22 16:50:42 INFO ShutdownHookManager: Deleting directory /private/var/folders/qx/84bp85pn2y5gn6_96k267psc0000gn/T/spark-de3a49e8-e75a-4c6f-8013-a6829507e0f3

JVM (Java Virtual Machine)

Java에서 프로그램을 실행 한다는 것은 컴파일 과정을 통해서 생성된 Class파일을 JVM으로 로딩하고 ByteCode를 해석(interpret) 하는 과정을 거쳐 메모리 등의 리소스를 할당하고 관리하며 정보를 처리하는 일련의 작업들을 포괄한다.

자바 가상머신

Thread 관리 및 Garbage Collection과 같은 메모리 정리 작업도 수행함.

  • 자바 애플리케이션을 클래스 로더를 통해 읽어 들여 자바 API와 함께 실행 하는것
  • JVM은 Java와 OS 사이에서 중개자 역할을 수행하여 JAVA가 OS에 구애 받지 않고 재사용을 가능하게 해줌
  • 메모리관리, Garbage collection을 수행
  • 스택 기반의 가상머신
  • "한번 작성해, 어디에서나 실행한다" 는 원칙!

JVM 구성 요소

JVM Architecture Diagram

ClassLoader를 통해 Class파일들을 로딩하고, 로딩된 Class파일들은 Execute Engine을 통해 해석 됨

해석된 프로그램은 Runtime Data Areas에 배치되어 실질적인 수행이 이루어짐

Java Source

사용자가 작성한 Java 코드 이다.

Java Compiler

Java Source를 JVM이 해석 할 수 있는 Java ByteCode로 변경

ClassLoader

JVM 내로 .class 파일들을 Load하여 Loading된 클래스들을 Runtime Data Area에 배치된다.

Execution Engine

Loading된 클래스의 ByteCode를 해석(interpret) 한다.

Runtime Data Area에 할당된 바이트 코드들은 ExecutionEngine에 의해 실행이 된다. 실행 엔진은 바이트 코드를 읽고 실행한다.

  • InterPreter : 바이트 코드명령어를 하나씩 읽어서 해석하고 실행.
    하나하나 해석은 빠르지만 전체적인 실행속도는 느리다는 단점.
  • JIT(Just In Time) Compiler : 인터프리터의 단점을 보완하기 위해 도입된것이 JIT.
    역할 : 인터프리터 방식으로 실행 되다가 같은 코드를 매번 해석하지 않고 실행 할때 컴파일을 하면서 해당코드를 NativeCode로 변경후에 캐싱한다. 이후에는 바뀐 부분만 컴파일하고 나머지는 캐싱된 코드를 사용함으로써 인터프리터의 속도를 개선할 수 있다.
  • Garbage Collector : 참조 되지 않는 오브젝트를 수집하고 제거

Runtime Data Area

JVM 이라는 프로세스가 프로그램을 수행하기 위해 OS에서 할당 받은 메모리 공간

Method Area

클래스, 변수, Method, static 변수, 상수 정보 등이 저장되는 영역이다.( 모든 Thread가 공유한다)

Heap Area

New 명령어로 생성된 인스턴스와 객체가 저장되는 구역 (GarbageCollection 이슈는 이 영역에서 일어나며, 모든 Thread가 공유)

Stack Area

Method 내에서 사용되는 값들( 매개변수, 지역변수, 리턴값) 등이 저장되는 구역으로 메소드가 호출될때 LIFO(last in first out : 마지막에 나온것이 먼저 나간다) 로 하나씩 생성되고 메소드 실행이 완료되면 LIFO로 하나씩 지워진다.

(각 Thread 별로 하나씩 생성됨)

PC Register

CPU의 Register와 역할이 비슷하다. 현재 수행중인 JVM 명령의 주소값이 저장된다( 각 Thread별로 하나씩 생성)

Native Method Stack

다른 언어(C/C++) 의 메소드 호출을 위해 할당되는 구역으로 언어에 맞게 Stack이 형성되는 구역

참고

https://d2.naver.com/helloworld/1230

JVM Performance Optimizing 및 성능 분석 사례 -1장 JVM 메모리 구조

'ProgramLanguage > Java' 카테고리의 다른 글

JNDI (Java Naming and Directory Interface)  (0) 2020.08.30
Java8- Default 메서드(Abstract, Interface)  (0) 2020.03.18
JVM 튜닝  (0) 2020.02.10
Lambda_Expression(2)  (0) 2020.01.25
객체 지향 설계 5원칙 - SOLID  (0) 2019.12.26

스파크의 특징에는 한서버에서 큰데이터의 양을 처리할수 있다고하는데 어떻게 되는건지 궁금해서 찾아 보았다.

StackOverFlow에서 검색을 한 결과 두 가지의 설명이 있다.

내가 원하는 질문은 2번이지만, 1번의 글도 해석을 해보았습니다.

 

1. 질문 : 스파크의 주 메모리에 맞출 수 없을때 스파크가 큰 파일(페타 바이트)을 읽는 방법

대답 : Spark는 작업이 호출(action)될 때만 데이터 읽기를 시작한다. 작업이 호출되면 Spark가 파티션의 데이터를 로드 한다. 동시에 불러온 파티션의 수는 사용가능한 코어 수에 따라다르다. Spark에서 1partition = 1 core = 1 로 생각 할 수도 있다. 동시에 로드된 모든 파티션은 메모리에 맞아야한다. 그렇지 않으면 OOM이 표시됨

여러 단계가 있다고 가정하면 Spark는 로드된 파티션에서만 첫번째 변환 작업을실행한다. 로드된 파티션의 데이터에 변환을 적용하려면 출력을 셔플 데이터로 저장한 다음 더 많은 파티션을 읽고 난후 모든 데이터를 읽을때까지 계속 읽는다.

transform을 적용하지 않고 count만 하는 경우에는 Spark는 파티션의 데이터를 읽지만 클러스터에 데이터를 저장하지 않으며, count를 다시 수행하면 모든 데이터를 다시 한번 읽게된다. 이럴 경우 데이터를 여러변 읽게 되는데, 이를 방지 하기 위해서, cache와 persist를 사용한다.

persist(StorageLevel.MEMORY_ONLY) 와 persist(MEMORY_AND_DISK) 두 가지가 있는데, 메모리에만 저장할 경우에는 OOM이 표기될수도 있다. 그래서 메모리와 디스크에 저장을 하는데, 데이터가 디스크에 맞지 않으면 OS는 일반적으로 executor를 kill 한다.

https://stackoverflow.com/questions/46638901/how-spark-read-a-large-file-petabyte-when-file-can-not-be-fit-in-sparks-main

 

2. 질문 : 메모리 내 계산을위한 spark의 작동 메커니즘과 혼동되지 않습니다. spark가 인 메모리 처리를 수행하는 경우 16GB의 인 메모리 스토리지가있을 때 100TB의 데이터를 계산하는 방법

대답 : Spark는 머신의 리소스 및 컴퓨팅 기능에 맞는 데이터 블록에서 작동합니다. 이 작업은 여러 번 반복되어 스파크가 데이터를 메모리에로드하고 처리 한 후 다음 데이터 청크에서 작업하는 데 필요한 경우 결과를 디스크에 다시 기록합니다.

https://stackoverflow.com/questions/54959785/how-does-spark-do-in-memory-computation-when-size-of-data-is-far-larger-than-ava

2번 질문을 보면 데이터블록에서 동작하는것이고, 데이터를 메모리에 로드하고 처리한 후 데이터 청크에서 작업을 하고 결과를디스크에 다시 기록하고, 메모리가 부족한 경우에는 디스크와 메모리를 반복적으로 사용하는 것 같은데 자세한 내용은 다음에 정리하여 올리겠습니다.

빅데이터 처리를 위한 오픈소스 병렬 분산 처리 플랫폼


반복처리와 연속으로 이루어지는 변환처리 의 고속화

스파크는 특정한 데이터 셋에 대하여 반복처리와 연속적으로 이루어지는 변환 처리를 고속화 할 목적으로 개발

스파크가 등장하기전에는 어땠을까?

데이터를 처리 하기 위해 맵리듀스(MapReduce)가 널리 활용 되어왔다.

맵리듀스는 기본적으로 입력 데이터를 스토리지에 읽고 다수의 머신에 분산처리를 수행한 결과를 스토리지에 저장한다. 이 처리가 끝나지않는 경우에는 데이터가

읽기 -> 분산처리 -> 저장 을 계속 보존하여 수행한다.

맵리듀스를 처리하면 중간결과가 항상 스토리지에 저장되서 데이터 크기가 커도 문제가 없이 동작하며, 장애발생시에도 쉽게 복구 된다.

하지만 특정 데이터의 부분집합에 대해 여러번 처리 하는 애플리케이션일 경우에는 맵리듀스로 효울적인 처리가 어렵다.

매번 중간 데이터를 다시 읽어야 하기 때문에다.이럴경우 디스크의 출력에 많은 리소스가 발생한다.

반면에 Spark는 연속적인 처리에서 불필요한 디스크와 네트워크 I/O가 발생하지 않도록 처리 한다.

처리 전체를 검토한 뒤에 그에 맞는 최적화 처리를 끼워 넣는 방식으로 설계하여 맵 리듀스 보다 고속으로 처리가 가능하다.


TODO

  1. 하둡,Spark 로 map과 filter programming 코드 구현하고 비교하기(궁금함)

시행착오에 적합한 환경제공

한 대의 머신으로 처리할 수 있는 용량보다 더 큰 데이터셋에 대해 시행 착오가 가능한 환경을 제공함

파이썬, Matlab, 등 다양한 애널리틱스 툴들은 데이터 해석을 한정적인 분석을 하기에 적합한 환경이다. 하지만 한 대 의 서버에서 처리할 수 있는 용량을 넘어서는 데이터셋에 대해서는 위의 소프트웨어들은 현실적이지 않음.

큰 데이터셋에 대해서는 스카일아웃과 같은 방법을 선택하고, 여러대의 머신으로 병렬 분산 처리를 수행할 필요가 있다.


Spark는 데이터셋을 추상적으로 다루기 위한 RDD(Resilient Distribued Dataset) 이라는 것이 있는데, RDD가 제공하는 API로 변환을 기술하기만 하면 처리가 되게 구현되어있다.


서로 다른 처리를 통합해 이용할 수 있는 환경

배치 처리, 스트림처리, SQL 처리 ,머신러닝, 그래프 처리 와 같은 형태의 애플리케이션을 하나의 환경에서 통합적으로 다룰 수 있다는 점

위의 다양한 형태의 어플리케이션을 구현하려면 각 프로그래밍 모델에 따라 애플리케이션을 별도로 구성을 해야했지만, 동일한 환경에서 사용이 가능하다.

예를 들면, SQL로 데이터 클랜징, 필터링 후에 머신러닝, 다시 SQL로 처리 하는 것을 구현해야한다고하면? 기존에는 각각의 미들웨어를 사용하여 처리하고 이동하고 복잡한구성을 만들수도 있었지만, 스파크 에서는 하나의 동일한 애플리케이션에서 구현하여 처리가 가능하다.

미들웨어의 병행에 운용하는 경우와 비교하여 운용 부담을 줄일 수 있다.


용어

Shuffle : 파티션의 위치가 변경되는 작업

SparkSession 에서 read를 하게 되면 DataFrame, Dataset으로 생성할 수 있고, RDD로 데이터를 생성하려면 SparkSession.sparkContext를 사용해서 생성해야한다.



GroupByKey, ReduceByKey 이전의 스파크를 다루는 기술 이라는 책의 4번째 챕터에 나왔었는데 이런 기능이 있구나 하고 넘어갔었다.

groupByKey 변화 연산자는 동일한 키를 가진 모든 요소를 단일 키-값 쌍으로 모은 Pair RDD를 반환한다.

우선 결론은 GroupByKey는 각 키의 모든 값을 메모리로 가져오기 때문에 이 메서드를 사용할 때는 메모리 리소스를 과다하게 사용하지 않도록 주의해야한다.

한꺼번에 그루핑 할 필요가 없으면 aggregateByKey나 reduceByKey, foldByKey를 사용하는 것이 좋다. 고 설명 하고 있다.

PairRDDFunctions.scala 를 살펴 보면 다음과 같다.

groupByKey ()

  • 매우 비싼 동작

  • 그룹의 순서는 보장하지 않는다. 그리고 key는 메모리에 존재하고, 만약 키가 너무 많으면 OutofMemory가 발생한다.

**

  • Group the values for each key in the RDD into a single sequence. Allows controlling the
  • partitioning of the resulting key-value pair RDD by passing a Partitioner.
  • The ordering of elements within each group is not guaranteed, and may even differ
  • each time the resulting RDD is evaluated.
  • @note This operation may be very expensive. If you are grouping in order to perform an
  • aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey
  • or PairRDDFunctions.reduceByKey will provide much better performance.
  • @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
  • key in memory. If a key has too many values, it can result in an OutOfMemoryError.
  • /
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
    }

reduceByKey()

MapReduce의 "combiner"와 마찬가지로 결과를 리듀서로 보내기 전에 각 맵퍼에서 로컬로 병합을 수행합니다.
기존 파티 셔너 병렬 처리 수준으로 출력이 해시 파티션됩니다.

  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
   * parallelism level.
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

아래 reduceByKey는 위의 메소드 안의 reduceByKey(defaultPartitioner(self),func)설명

  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

테스트 예제 코드

전체 코드는 다음과 같다.

import org.apache.spark.sql.SparkSession

object Main{

  def main(args: Array[String]): Unit = {

    val sparkSession = SparkUtil.getInstance();
    //예제 데이터 만들기
    val studentRDD = sparkSession.sparkContext.parallelize(Array(
      ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91),
      ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62),
      ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78),
      ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
      ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91),
      ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65),
      ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86),
      ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
      ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64),
      ("Juan", "Biology", 60)), 3);

    studentRDD.foreach(println);
    //RDD에서의 groupByData
    val groupBydata = studentRDD
      .groupBy((name)=> name._1);

    groupBydata.foreach(println)
    /* groupBydata 결과
    (Tina,CompactBuffer((Tina,Maths,78), (Tina,Physics,73), (Tina,Chemistry,68), (Tina,Biology,87)))
(Thomas,CompactBuffer((Thomas,Maths,87), (Thomas,Physics,93), (Thomas,Chemistry,91), (Thomas,Biology,74)))
(Jackeline,CompactBuffer((Jackeline,Maths,86), (Jackeline,Physics,62), (Jackeline,Chemistry,75), (Jackeline,Biology,83)))
(Joseph,CompactBuffer((Joseph,Maths,83), (Joseph,Physics,74), (Joseph,Chemistry,91), (Joseph,Biology,82)))
(Juan,CompactBuffer((Juan,Maths,63), (Juan,Physics,69), (Juan,Chemistry,64), (Juan,Biology,60)))
(Jimmy,CompactBuffer((Jimmy,Maths,69), (Jimmy,Physics,62), (Jimmy,Chemistry,97), (Jimmy,Biology,80)))
(Cory,CompactBuffer((Cory,Maths,56), (Cory,Physics,65), (Cory,Chemistry,71), (Cory,Biology,68)))

    */


    // Pair RDD!!
    // pairRDD 로 변경하기 
    val pairStudentSubjectScore = studentRDD.map((student)=>(student._1,student._3));

    //reduceByKey와 groupByKey 비교 

    //reduceByData
    val reduceByData = pairStudentSubjectScore.reduceByKey((value1,value2)=>value1+value2);

    reduceByData.foreach(println);
    /*    reduceByData 결과
    (Tina,306)
    (Thomas,345)
    (Jackeline,306)
    (Joseph,330)
    (Juan,256)
    (Jimmy,308)
    (Cory,260)

    */


    //groupByData
    val groupByData = pairStudentSubjectScore.groupByKey();
    groupByData.foreach(println)
    /* groupByData 결과
        (Tina,CompactBuffer(78, 73, 68, 87))
      (Juan,CompactBuffer(63, 69, 64, 60))
      (Jimmy,CompactBuffer(69, 62, 97, 80))
      (Cory,CompactBuffer(56, 65, 71, 68))
      (Thomas,CompactBuffer(87, 93, 91, 74))
      (Jackeline,CompactBuffer(86, 62, 75, 83))
      (Joseph,CompactBuffer(83, 74, 91, 82))
    */

    val groupByDataSum = groupByData.map(value=>( value._1,value._2.sum))

    groupByDataSum.foreach(println)
        /* groupByDataSum 결과
        (Juan,256)
      (Thomas,345)
      (Jackeline,306)
      (Joseph,330)
      (Jimmy,308)
      (Cory,260)
      (Tina,306)
        */

    sparkSession.close();

  }

}

요약
ReduceByKey는 셔플 전에 데이터를 결합하므로 GroupByKey에 비해 네트워크를 통해 전송되는 데이터의 양을 최소화합니다. GroupByKey는 대부분 디스크 부족 문제의 원인입니다 .

참고

  1. https://thebook.io/006908/part01/ch04/03/03/01/

  2. https://intellipaat.com/community/7677/spark-difference-between-reducebykey-vs-groupbykey-vs-aggregatebykey-vs-combinebykey

  3. https://www.streamhub.co.uk/apache-spark-tuning-manual/?fbclid=IwAR3xe6qYN0DN2FIZo0sdoPr1UjLYIL9QofCtgzcDcEAHGTZc5KOiTTy-ASs

Java8에 interface에서 default 메서드가 추가 된다면 어떻게 될까?

public abstract class AbstractTest {
    abstract void printString();
}

public interface InterFaceTest {
    default void printString(){
        System.out.println("PrintInterface");
    }
}

이럴 경우 AbstractTest를 상속받고 InterFaceTest를 구현하면 어떻게 될까?
public class Test extends AbstractTest implements InterFaceTest {

}

이럴 경우에는 IDE에서는 아래와 같은 충돌로 에러를 검출한다.
Class 'Test' must either be declared abstract or implement abstract method 'printString()' in 'AbstractSimple'
이럴 경우에는 아래와 같이 AbstractClass의 메서드를 구현한다.
public class Test extends AbstractTest implements InterFaceTest {
    @Override
    public void printString() {

    }
}

정리
- default Method 와 조상 클래스의 메서드 간의 충돌 : 인터페이스에서 디폴트 메서드는 무시된다.
- 여러 인터페이스의 디폴트 메서드간의 충돌 : 인터페이스를 구현한 클래스에서 디폴트 메서드를 오버라이딩 해야한다.

'ProgramLanguage > Java' 카테고리의 다른 글

JNDI (Java Naming and Directory Interface)  (0) 2020.08.30
JVM 메모리 구조 (1)  (0) 2020.03.22
JVM 튜닝  (0) 2020.02.10
Lambda_Expression(2)  (0) 2020.01.25
객체 지향 설계 5원칙 - SOLID  (0) 2019.12.26

DAG : 방향성 비순환 그래프

방향성은 있으니 비순환인 그래프

Spark에서는 DAG를 사용하여 프로그램을 스케줄링 하고 최적화 한다.

하둡 MapReduce와 Spark 사이의 차이를 정리

우선 Spark 에서의 DAG는 vertices (노드) 와 edges(선) 으로 이루어져있다.

Edge들은 동작을 나타내는 transform이 있다.


DAG : https://helloino.tistory.com/94

Spark의 DAG를 통해서 사용자는 스테이지 에서 세부적으로 확장이 가능하다

각각의 스테이지로 구성되는 작업을 RDD의 파티션에 기초해서 병렬로 동일한 연산을 수행한다.

HDFS같은 경우에는 데이터를 읽고 작업을 나누는 map과 다시 결합하는 reduce작업을 거치고 다시 hdfs에 기록이 된다. 이러한 경우에는 hdfs의 메모리 또는 디스크 메모리가 낭비된다.

Spark에서는 연속계산단계인 DAG가 형성된다. 이렇게 하나의 스테이지처리가 끝나면 셔플링 데이터를 최소화 하기 위해 실행 계획을 최적화 한다.

DAG 스케줄러는 운영자를 작업 단계로 나눈다. 스테이지에는 입력 데이터의 파티션을 기반으로 작업을 진행한다.

Spark가 lazy evaluation으로 수행이 되어 stage별로 실행이 되는데 이유는 action이 시작되는 시점에서 transformation 끼리의 연계를 파악해 실행 계획의 최적화가 이루어진다. 여기서 최적화는 대부분 지역성(Locality)에 관한 것이다. 
 이전의 MapReduce 같은 경우에도 map과 reduce연산 중간에 동작하는 셔플은 데이터를 전달하는데 많은 비용이 든다. 이것을 줄이기 위해 많이 신경 써야했지만, Spark는 map filter 등 Narrow transformation같은 경우에는 해당 partition에서 실행되므로 효율적인 계획을 세워서 수행을 한다.

Lineage란 ??

여기 까지 하면 transformation 밖에 없기 때문에 만들어지지는 않고 lineage라는 형태로 rdd가 형성된다. 이것이 Lineage : logical plan 이다.

모든 상위의 RDD가 파생된 RDD에 연결되는 방법을 나타내는 그래프

RDD . toDebugString 메서드로 알 수 있다.

예제

scala> val rdd1 = sc.parallelize(1 to 9)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = rdd1.filter(x => x==5)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:25

scala> val rdd3 = rdd2.map(x=>(x,x))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:25

scala> rdd3.toDebugString
res1: String =
(8) MapPartitionsRDD[2] at map at <console>:25 []
 |  MapPartitionsRDD[1] at filter at <console>:25 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:24


What is difference between lineage and DAG ?

dag는 physical plan 이다 . 스테이지는 또다른 어떤 스테이지에 의존하는지 에 대한 실질적인 처리 를 나타낸다.

실질적으로 Spark가 처리하는 물리적인 계획은 DAG에 그려진다.


예제) https://www.youtube.com/watch?v=NGOD7JN6azM

Spark DAG: https://data-flair.training/blogs/dag-in-apache-spark/

Spark 이점 : https://brocess.tistory.com/104

SparkSession, SparkContext, SQLContext, HiveContext 너무 많다 .


Spark2.0이후 부터는 아래와 같은 계층 구조를 가진다.

SparkSession > SparkContext > HiveContext > SQLContext


SparkSession은 spark2.X 이후부터 지원한다.


SparkSession

Spark 2.0 이후부터는 SparkSession을 사용해서 Spark Dataset, DataFrame API 를 시작할수 있다.

또한 SparkContext에서 사용 가능한 모든 기능은 SparkSession에서도 사용이 가능하다.

SparkContext를 선호한다면 SparkContext를 계속 사용 할 수 있다.

builder를 사용해서 REPL, notebooks를 만들어낼수 있으며, 기존에 존재하는 세션을 사용할 수 있다.

SparkSession.builder
  .master("local")
  .appName("Word Count")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/SparkSession.html#newSession--


SparkContext

SparkSession으로 시작을 해서 SparkContext( )를 하고 싶다면 아래와 같이

SparkSession에서 sparkContext() 메서드를 사용하면 SparkContext를 사용할 수 있다.

SparkConext는 Spark 클러스터에 대한 연결을 나타내며 해당 클러스터에서 RDD, broadcast, accumulator 등의 변수를 사용하여 사용이 가능하다.

JVM 당 하나의 SparkContext만 활성화 될 수 있다.


SparkContext의 객체를 구성

SparkContext()시스템 속성에서 설정을로드하는 SparkContext를 만듭니다 (예 : ./bin/spark-submit으로 시작할 때).
SparkContext(SparkConf config)
SparkContext(String master, String appName, SparkConf conf)일반적인 Spark 속성을 직접 설정할 수있는 대체 생성자
SparkContext(String master, String appName, String sparkHome, scala.collection.Seq jars, scala.collection.Map environment)일반적인 Spark 속성을 직접 설정할 수있는 대체 생성자


https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/SparkContext.html


Spark SQLContext

org.apache.spark.sql.SQLContext

Spark 1버전에서 는 SQLContext 는 구조적 api를 사용할때 만들어서 사용할때 사용했다.

그러나 Spark 2 부터는 SparkSession에서 접근이 바로 가능하도록 변경되었다.

//spark 1.x
SQLContext.sql(String sqlText)
//spark 2.x
SparkSession.sql(String sqlText)

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/SQLContext.html


Spark HiveContext

SparkHiveContext는 가장 아래 위치

Spark SQL 실행 엔진 하나의 인스턴스 이며, hive에 저장된 데이터와 통합됨.

org.apache.spark.sql.hive
Class HiveContext
Object
  org.apache.spark.sql.SQLContext
    org.apache.spark.sql.hive.HiveContext

Hive 에 대한 구성은 conf에 hive-site.xml에서 읽기가 가능하다.

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/hive/HiveContext.html


참고

https://gankrin.org/sparksession-vs-sparkcontext-vs-sqlcontext-vs-hivecontext/

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/SparkSession.html#newSession--

'BackEnd > Spark' 카테고리의 다른 글

[Spark] GroupByKey, ReduceByKey 차이 (PairRDD)  (0) 2020.03.18
Spark Directed acyclic graph, lineage  (0) 2020.03.17
SparkSession newSession() 테스트  (0) 2020.03.15
SparkSQL Casting 타입 변환, 오류 검출  (0) 2020.03.11
Spark SQL API  (0) 2020.03.09

+ Recent posts