한글 컬럼을 불러오고 싶다면 

` 백틱을 앞뒤에 붙여주면된다.

select `이름` from student

아래와 같은 에러가 발생하는 이슈가 있다.

Caused by: org.apache.spark.SparkException: Unable to create database default as failed to create its directory /home/morris/server/spark/spark-warehouse
        at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.liftedTree1$1(InMemoryCatalog.scala:114)
        at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.createDatabase(InMemoryCatalog.scala:108)
        at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:99)
        at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:101)
        at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:101)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:101)
        at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:100)
        at org.apache.spark.sql.internal.SessionState.<init>(SessionState.scala:157)
        ... 26 more
Caused by: java.net.ConnectException: Call From localhost/192.168.76.102 to namenode:9000 failed on connection exception: java.net.ConnectException: 연결이 거부됨; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

위의 내용은 데이터 파일을 읽어올 때, 경로가 아래와 같이 일반 url이 적혀있어서 에러가 발생한다.
        Dataset<Row> dataset = sparkSession.read()
                .format("csv")
                .option("header", "true")
                .load("/home/morris/data/TestData.csv");

위와 같이 아무것도 적어주지 않으면 default로 hdfs의 namenode에 접속을 한다.

로컬의 파일을 선택할 경우에는 아래와 같이 file:// 을 붙여줘야한다.

        Dataset<Row> dataset = sparkSession.read()
                .format("csv")
                .option("header", "true")
                .load("file:///home/morris/data/TestData.csv");

SparkSQL JDBC

read.format("jdbc") 와 read.jdbc 의 차이는 뭘까 ?

결과 : 불러오는 것에 대한 코드는 같지만, format을 통해 다양한 옵션을 사용해서 JDBC에서 데이터를 불러오는 성능을 향상 시킬수 있다.

// Loading data from a JDBC source
//전체 데이터를 나눠서 사용할수 있는 옵션 설정 가능
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
//전체 데이터를 한번에
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)


// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

데이터브릭스의 홈페이지에서 이유를 알수 있었다.

Push down a query to the database engine

You can push down an entire query to the database and return just the result. The table parameter identifies the JDBC table to read. You can use anything that is valid in a SQL query FROM clause.

Scala

// Note: The parentheses are required.
val pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
val df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

spark.read.jdbc 의 메서드를 사용하면, 전체 데이터를 가지고 오는 경우가 된다.

해당 쿼리를 하나의 Executor에서 DBMS로 쿼리를 날려서 갖고오게 되는데, 이 경우 메모리가 부족하면 GC가 발생할 수 있다.

그래서 더 좋은 방법은 spark.read.format("jdbc").option("key","value").load() 를 하는 방법도 있다. 하지만 파티션을 지정하지 않는다면 JDBC의 단일 쿼리를 사용해서 하나의 파티션에 모든 데이터를 가지고 온다. 그렇기 때문에 파티션을 설정하고 executor에 나눠서 데이터를 가지고 오는것을 추천한다.

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
    .option("partitionColumn",)
    .option("lowerBound",1)
    .option("upperBound",10000)
    .option("numberPartitions",5)
  .load()

주요 옵션 설정

partitionColumn : 데이터 집합을 분할할때 사용되는 열의 이름

lowerBound : 읽어올 최소값

upperBound : 읽을 최대값

numberPartitions : 파티션의 수

fetchSize : 원격의 데이터베이스에서 fetch되는 행 수를 제어하는 매개변수
기본 값에서 약간만 늘리면 성능이 크게 향상 될 수 있다.

값을 너무 낮게 하면 전체결과 집합을 가지고 오기 위해서 외부 데이터베이스 간에 많은 요청으로 작업부하에 지연이됨.

너무 많은 요청을 DBMS에서 핸들링 못하는 것에 대한 에러가 발생 할 수 있다.

https://docs.databricks.com/data/data-sources/sql-databases.html

https://medium.com/@radek.strnad/tips-for-using-jdbc-in-apache-spark-sql-396ea7b2e3d3

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-DataFrameReader.html

Spark Jersey dependency 충돌

이슈 : 2020-03-23 16:21:03.349 [SparkUI-28] WARN o.s.jetty.servlet.ServletHandler - Error for /api/v1/applications
java.lang.NoSuchFieldError: INCLUDE_ALL

Jersey 라이브러리와 충돌이 나서 발생했던것

2020-03-23 16:21:03.349 [SparkUI-28] WARN  o.s.jetty.servlet.ServletHandler - Error for /api/v1/applications
java.lang.NoSuchFieldError: INCLUDE_ALL
    at org.glassfish.jersey.server.ResourceConfig$State.<init>(ResourceConfig.java:114)
    at org.glassfish.jersey.server.ResourceConfig.<init>(ResourceConfig.java:356)
    at org.glassfish.jersey.servlet.WebComponent.createResourceConfig(WebComponent.java:578)
    at org.glassfish.jersey.servlet.WebComponent.<init>(WebComponent.java:356)
    at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177)
    at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369)
    at javax.servlet.GenericServlet.init(GenericServlet.java:244)
    at org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:643)
    at org.spark_project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:499)
    at org.spark_project.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:791)
    at org.spark_project.jetty.servlet.ServletHolder.prepare(ServletHolder.java:776)
    at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:580)
    at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
    at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:513)
    at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
    at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
    at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
    at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
    at org.spark_project.jetty.server.Server.handle(Server.java:539)
    at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:333)
    at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
    at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
    at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:108)
    at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
    at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
    at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
    at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
    at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
    at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
    at java.lang.Thread.run(Thread.java:748)

해결 방법

기존의 build.gradle에 Jetty를 별도로추가해서 충돌이 일어나 발생하는것 같다.

예전 gradle.build

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    implementation 'org.apache.spark:spark-core_2.11:2.4.5'
    implementation 'org.apache.spark:spark-sql_2.11:2.4.5'
    implementation 'org.apache.spark:spark-hive_2.11:2.4.5'
    implementation 'org.codehaus.janino:janino:3.0.8'
    implementation 'org.slf4j:integration:1.7.29'
    implementation 'ch.qos.logback:logback-classic:1.2.3'
    implementation 'org.postgresql:postgresql:42.2.10.jre7'
    //요부분 jersey
    implementation 'org.glassfish.jersey.inject:jersey-hk2:2.30'

    implementation 'com.google.code.gson:gson:2.8.6'
    implementation 'org.projectlombok:lombok:1.18.10'
    annotationProcessor 'org.projectlombok:lombok:1.18.10'
}

해결한 gradle.build

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    implementation 'org.apache.spark:spark-core_2.11:2.4.5'
    implementation 'org.apache.spark:spark-sql_2.11:2.4.5'
    implementation 'org.apache.spark:spark-hive_2.11:2.4.5'
    implementation 'org.codehaus.janino:janino:3.0.8'

    implementation 'org.slf4j:integration:1.7.29'
    implementation 'ch.qos.logback:logback-classic:1.2.3'
    compile 'org.postgresql:postgresql:42.2.10.jre7'

    implementation 'com.google.code.gson:gson:2.8.6'
    implementation 'org.projectlombok:lombok:1.18.10'
    annotationProcessor 'org.projectlombok:lombok:1.18.10'
}

log 로 살펴본 Spark 동작 과정

spark가 실행되는 과정을 로그를 통해서 살펴 보는것도 의미가 있다고 생각해서 진행하고자 합니다.

시간 날때마다 정리를 해서 업로드 할 예정입니다.

코드는 아래와 같다.

val sparkSession = SparkUtil.getInstance();

//Load RDD
val readme = sparkSession.sparkContext.textFile("./data/README.md");
readme.toDebugString;
println(readme.count());

sparkSession.stop()

아래는 위 코드의 SparkLog 입니다.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/03/22 16:50:39 INFO SparkContext: Running Spark version 2.4.3
20/03/22 16:50:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/03/22 16:50:40 INFO SparkContext: Submitted application: Spark Basic Test
20/03/22 16:50:40 INFO SecurityManager: Changing view acls to: daeyunkim
20/03/22 16:50:40 INFO SecurityManager: Changing modify acls to: daeyunkim
20/03/22 16:50:40 INFO SecurityManager: Changing view acls groups to: 
20/03/22 16:50:40 INFO SecurityManager: Changing modify acls groups to: 
20/03/22 16:50:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(daeyunkim); groups with view permissions: Set(); users  with modify permissions: Set(daeyunkim); groups with modify permissions: Set()
20/03/22 16:50:40 INFO Utils: Successfully started service 'sparkDriver' on port 52540.
20/03/22 16:50:40 INFO SparkEnv: Registering MapOutputTracker
20/03/22 16:50:40 INFO SparkEnv: Registering BlockManagerMaster
20/03/22 16:50:40 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/03/22 16:50:40 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/03/22 16:50:40 INFO DiskBlockManager: Created local directory at /private/var/folders/qx/84bp85pn2y5gn6_96k267psc0000gn/T/blockmgr-fa3cf378-4049-4c69-8c24-7eaddb7af53e
20/03/22 16:50:40 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
20/03/22 16:50:40 INFO SparkEnv: Registering OutputCommitCoordinator
20/03/22 16:50:40 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/03/22 16:50:40 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.30.1.58:4040
20/03/22 16:50:40 INFO Executor: Starting executor ID driver on host localhost
20/03/22 16:50:40 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52541.
20/03/22 16:50:40 INFO NettyBlockTransferService: Server created on 172.30.1.58:52541
20/03/22 16:50:40 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/03/22 16:50:41 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.30.1.58, 52541, None)
20/03/22 16:50:41 INFO BlockManagerMasterEndpoint: Registering block manager 172.30.1.58:52541 with 2004.6 MB RAM, BlockManagerId(driver, 172.30.1.58, 52541, None)
20/03/22 16:50:41 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.30.1.58, 52541, None)
20/03/22 16:50:41 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.30.1.58, 52541, None)
20/03/22 16:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 214.6 KB, free 2004.4 MB)
20/03/22 16:50:42 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.4 KB, free 2004.4 MB)
20/03/22 16:50:42 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.30.1.58:52541 (size: 20.4 KB, free: 2004.6 MB)
20/03/22 16:50:42 INFO SparkContext: Created broadcast 0 from textFile at TestInternalRDD.scala:12
20/03/22 16:50:42 INFO FileInputFormat: Total input paths to process : 1
20/03/22 16:50:42 INFO SparkContext: Starting job: count at TestInternalRDD.scala:14
20/03/22 16:50:42 INFO DAGScheduler: Got job 0 (count at TestInternalRDD.scala:14) with 2 output partitions
20/03/22 16:50:42 INFO DAGScheduler: Final stage: ResultStage 0 (count at TestInternalRDD.scala:14)
20/03/22 16:50:42 INFO DAGScheduler: Parents of final stage: List()
20/03/22 16:50:42 INFO DAGScheduler: Missing parents: List()
20/03/22 16:50:42 INFO DAGScheduler: Submitting ResultStage 0 (./data/README.md MapPartitionsRDD[1] at textFile at TestInternalRDD.scala:12), which has no missing parents
20/03/22 16:50:42 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.3 KB, free 2004.4 MB)
20/03/22 16:50:42 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2023.0 B, free 2004.4 MB)
20/03/22 16:50:42 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.30.1.58:52541 (size: 2023.0 B, free: 2004.6 MB)
20/03/22 16:50:42 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1161
20/03/22 16:50:42 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (./data/README.md MapPartitionsRDD[1] at textFile at TestInternalRDD.scala:12) (first 15 tasks are for partitions Vector(0, 1))
20/03/22 16:50:42 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
20/03/22 16:50:42 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7932 bytes)
20/03/22 16:50:42 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7932 bytes)
20/03/22 16:50:42 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
20/03/22 16:50:42 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
20/03/22 16:50:42 INFO HadoopRDD: Input split: file:/Users/daeyunkim/Documents/SparkCode/BasicSparkScala/data/README.md:0+2243
20/03/22 16:50:42 INFO HadoopRDD: Input split: file:/Users/daeyunkim/Documents/SparkCode/BasicSparkScala/data/README.md:2243+2244
20/03/22 16:50:42 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 875 bytes result sent to driver
20/03/22 16:50:42 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 875 bytes result sent to driver
20/03/22 16:50:42 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 141 ms on localhost (executor driver) (1/2)
20/03/22 16:50:42 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 122 ms on localhost (executor driver) (2/2)
20/03/22 16:50:42 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
20/03/22 16:50:42 INFO DAGScheduler: ResultStage 0 (count at TestInternalRDD.scala:14) finished in 0.238 s
20/03/22 16:50:42 INFO DAGScheduler: Job 0 finished: count at TestInternalRDD.scala:14, took 0.316393 s
108
20/03/22 16:50:42 INFO SparkUI: Stopped Spark web UI at http://172.30.1.58:4040
20/03/22 16:50:42 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/03/22 16:50:42 INFO MemoryStore: MemoryStore cleared
20/03/22 16:50:42 INFO BlockManager: BlockManager stopped
20/03/22 16:50:42 INFO BlockManagerMaster: BlockManagerMaster stopped
20/03/22 16:50:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/03/22 16:50:42 INFO SparkContext: Successfully stopped SparkContext
20/03/22 16:50:42 INFO ShutdownHookManager: Shutdown hook called
20/03/22 16:50:42 INFO ShutdownHookManager: Deleting directory /private/var/folders/qx/84bp85pn2y5gn6_96k267psc0000gn/T/spark-de3a49e8-e75a-4c6f-8013-a6829507e0f3

스파크의 특징에는 한서버에서 큰데이터의 양을 처리할수 있다고하는데 어떻게 되는건지 궁금해서 찾아 보았다.

StackOverFlow에서 검색을 한 결과 두 가지의 설명이 있다.

내가 원하는 질문은 2번이지만, 1번의 글도 해석을 해보았습니다.

 

1. 질문 : 스파크의 주 메모리에 맞출 수 없을때 스파크가 큰 파일(페타 바이트)을 읽는 방법

대답 : Spark는 작업이 호출(action)될 때만 데이터 읽기를 시작한다. 작업이 호출되면 Spark가 파티션의 데이터를 로드 한다. 동시에 불러온 파티션의 수는 사용가능한 코어 수에 따라다르다. Spark에서 1partition = 1 core = 1 로 생각 할 수도 있다. 동시에 로드된 모든 파티션은 메모리에 맞아야한다. 그렇지 않으면 OOM이 표시됨

여러 단계가 있다고 가정하면 Spark는 로드된 파티션에서만 첫번째 변환 작업을실행한다. 로드된 파티션의 데이터에 변환을 적용하려면 출력을 셔플 데이터로 저장한 다음 더 많은 파티션을 읽고 난후 모든 데이터를 읽을때까지 계속 읽는다.

transform을 적용하지 않고 count만 하는 경우에는 Spark는 파티션의 데이터를 읽지만 클러스터에 데이터를 저장하지 않으며, count를 다시 수행하면 모든 데이터를 다시 한번 읽게된다. 이럴 경우 데이터를 여러변 읽게 되는데, 이를 방지 하기 위해서, cache와 persist를 사용한다.

persist(StorageLevel.MEMORY_ONLY) 와 persist(MEMORY_AND_DISK) 두 가지가 있는데, 메모리에만 저장할 경우에는 OOM이 표기될수도 있다. 그래서 메모리와 디스크에 저장을 하는데, 데이터가 디스크에 맞지 않으면 OS는 일반적으로 executor를 kill 한다.

https://stackoverflow.com/questions/46638901/how-spark-read-a-large-file-petabyte-when-file-can-not-be-fit-in-sparks-main

 

2. 질문 : 메모리 내 계산을위한 spark의 작동 메커니즘과 혼동되지 않습니다. spark가 인 메모리 처리를 수행하는 경우 16GB의 인 메모리 스토리지가있을 때 100TB의 데이터를 계산하는 방법

대답 : Spark는 머신의 리소스 및 컴퓨팅 기능에 맞는 데이터 블록에서 작동합니다. 이 작업은 여러 번 반복되어 스파크가 데이터를 메모리에로드하고 처리 한 후 다음 데이터 청크에서 작업하는 데 필요한 경우 결과를 디스크에 다시 기록합니다.

https://stackoverflow.com/questions/54959785/how-does-spark-do-in-memory-computation-when-size-of-data-is-far-larger-than-ava

2번 질문을 보면 데이터블록에서 동작하는것이고, 데이터를 메모리에 로드하고 처리한 후 데이터 청크에서 작업을 하고 결과를디스크에 다시 기록하고, 메모리가 부족한 경우에는 디스크와 메모리를 반복적으로 사용하는 것 같은데 자세한 내용은 다음에 정리하여 올리겠습니다.

빅데이터 처리를 위한 오픈소스 병렬 분산 처리 플랫폼


반복처리와 연속으로 이루어지는 변환처리 의 고속화

스파크는 특정한 데이터 셋에 대하여 반복처리와 연속적으로 이루어지는 변환 처리를 고속화 할 목적으로 개발

스파크가 등장하기전에는 어땠을까?

데이터를 처리 하기 위해 맵리듀스(MapReduce)가 널리 활용 되어왔다.

맵리듀스는 기본적으로 입력 데이터를 스토리지에 읽고 다수의 머신에 분산처리를 수행한 결과를 스토리지에 저장한다. 이 처리가 끝나지않는 경우에는 데이터가

읽기 -> 분산처리 -> 저장 을 계속 보존하여 수행한다.

맵리듀스를 처리하면 중간결과가 항상 스토리지에 저장되서 데이터 크기가 커도 문제가 없이 동작하며, 장애발생시에도 쉽게 복구 된다.

하지만 특정 데이터의 부분집합에 대해 여러번 처리 하는 애플리케이션일 경우에는 맵리듀스로 효울적인 처리가 어렵다.

매번 중간 데이터를 다시 읽어야 하기 때문에다.이럴경우 디스크의 출력에 많은 리소스가 발생한다.

반면에 Spark는 연속적인 처리에서 불필요한 디스크와 네트워크 I/O가 발생하지 않도록 처리 한다.

처리 전체를 검토한 뒤에 그에 맞는 최적화 처리를 끼워 넣는 방식으로 설계하여 맵 리듀스 보다 고속으로 처리가 가능하다.


TODO

  1. 하둡,Spark 로 map과 filter programming 코드 구현하고 비교하기(궁금함)

시행착오에 적합한 환경제공

한 대의 머신으로 처리할 수 있는 용량보다 더 큰 데이터셋에 대해 시행 착오가 가능한 환경을 제공함

파이썬, Matlab, 등 다양한 애널리틱스 툴들은 데이터 해석을 한정적인 분석을 하기에 적합한 환경이다. 하지만 한 대 의 서버에서 처리할 수 있는 용량을 넘어서는 데이터셋에 대해서는 위의 소프트웨어들은 현실적이지 않음.

큰 데이터셋에 대해서는 스카일아웃과 같은 방법을 선택하고, 여러대의 머신으로 병렬 분산 처리를 수행할 필요가 있다.


Spark는 데이터셋을 추상적으로 다루기 위한 RDD(Resilient Distribued Dataset) 이라는 것이 있는데, RDD가 제공하는 API로 변환을 기술하기만 하면 처리가 되게 구현되어있다.


서로 다른 처리를 통합해 이용할 수 있는 환경

배치 처리, 스트림처리, SQL 처리 ,머신러닝, 그래프 처리 와 같은 형태의 애플리케이션을 하나의 환경에서 통합적으로 다룰 수 있다는 점

위의 다양한 형태의 어플리케이션을 구현하려면 각 프로그래밍 모델에 따라 애플리케이션을 별도로 구성을 해야했지만, 동일한 환경에서 사용이 가능하다.

예를 들면, SQL로 데이터 클랜징, 필터링 후에 머신러닝, 다시 SQL로 처리 하는 것을 구현해야한다고하면? 기존에는 각각의 미들웨어를 사용하여 처리하고 이동하고 복잡한구성을 만들수도 있었지만, 스파크 에서는 하나의 동일한 애플리케이션에서 구현하여 처리가 가능하다.

미들웨어의 병행에 운용하는 경우와 비교하여 운용 부담을 줄일 수 있다.


용어

Shuffle : 파티션의 위치가 변경되는 작업

SparkSession 에서 read를 하게 되면 DataFrame, Dataset으로 생성할 수 있고, RDD로 데이터를 생성하려면 SparkSession.sparkContext를 사용해서 생성해야한다.



GroupByKey, ReduceByKey 이전의 스파크를 다루는 기술 이라는 책의 4번째 챕터에 나왔었는데 이런 기능이 있구나 하고 넘어갔었다.

groupByKey 변화 연산자는 동일한 키를 가진 모든 요소를 단일 키-값 쌍으로 모은 Pair RDD를 반환한다.

우선 결론은 GroupByKey는 각 키의 모든 값을 메모리로 가져오기 때문에 이 메서드를 사용할 때는 메모리 리소스를 과다하게 사용하지 않도록 주의해야한다.

한꺼번에 그루핑 할 필요가 없으면 aggregateByKey나 reduceByKey, foldByKey를 사용하는 것이 좋다. 고 설명 하고 있다.

PairRDDFunctions.scala 를 살펴 보면 다음과 같다.

groupByKey ()

  • 매우 비싼 동작

  • 그룹의 순서는 보장하지 않는다. 그리고 key는 메모리에 존재하고, 만약 키가 너무 많으면 OutofMemory가 발생한다.

**

  • Group the values for each key in the RDD into a single sequence. Allows controlling the
  • partitioning of the resulting key-value pair RDD by passing a Partitioner.
  • The ordering of elements within each group is not guaranteed, and may even differ
  • each time the resulting RDD is evaluated.
  • @note This operation may be very expensive. If you are grouping in order to perform an
  • aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey
  • or PairRDDFunctions.reduceByKey will provide much better performance.
  • @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
  • key in memory. If a key has too many values, it can result in an OutOfMemoryError.
  • /
    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
    }

reduceByKey()

MapReduce의 "combiner"와 마찬가지로 결과를 리듀서로 보내기 전에 각 맵퍼에서 로컬로 병합을 수행합니다.
기존 파티 셔너 병렬 처리 수준으로 출력이 해시 파티션됩니다.

  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
   * parallelism level.
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

아래 reduceByKey는 위의 메소드 안의 reduceByKey(defaultPartitioner(self),func)설명

  /**
   * Merge the values for each key using an associative and commutative reduce function. This will
   * also perform the merging locally on each mapper before sending results to a reducer, similarly
   * to a "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }

테스트 예제 코드

전체 코드는 다음과 같다.

import org.apache.spark.sql.SparkSession

object Main{

  def main(args: Array[String]): Unit = {

    val sparkSession = SparkUtil.getInstance();
    //예제 데이터 만들기
    val studentRDD = sparkSession.sparkContext.parallelize(Array(
      ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91),
      ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62),
      ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78),
      ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
      ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91),
      ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65),
      ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86),
      ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83),
      ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64),
      ("Juan", "Biology", 60)), 3);

    studentRDD.foreach(println);
    //RDD에서의 groupByData
    val groupBydata = studentRDD
      .groupBy((name)=> name._1);

    groupBydata.foreach(println)
    /* groupBydata 결과
    (Tina,CompactBuffer((Tina,Maths,78), (Tina,Physics,73), (Tina,Chemistry,68), (Tina,Biology,87)))
(Thomas,CompactBuffer((Thomas,Maths,87), (Thomas,Physics,93), (Thomas,Chemistry,91), (Thomas,Biology,74)))
(Jackeline,CompactBuffer((Jackeline,Maths,86), (Jackeline,Physics,62), (Jackeline,Chemistry,75), (Jackeline,Biology,83)))
(Joseph,CompactBuffer((Joseph,Maths,83), (Joseph,Physics,74), (Joseph,Chemistry,91), (Joseph,Biology,82)))
(Juan,CompactBuffer((Juan,Maths,63), (Juan,Physics,69), (Juan,Chemistry,64), (Juan,Biology,60)))
(Jimmy,CompactBuffer((Jimmy,Maths,69), (Jimmy,Physics,62), (Jimmy,Chemistry,97), (Jimmy,Biology,80)))
(Cory,CompactBuffer((Cory,Maths,56), (Cory,Physics,65), (Cory,Chemistry,71), (Cory,Biology,68)))

    */


    // Pair RDD!!
    // pairRDD 로 변경하기 
    val pairStudentSubjectScore = studentRDD.map((student)=>(student._1,student._3));

    //reduceByKey와 groupByKey 비교 

    //reduceByData
    val reduceByData = pairStudentSubjectScore.reduceByKey((value1,value2)=>value1+value2);

    reduceByData.foreach(println);
    /*    reduceByData 결과
    (Tina,306)
    (Thomas,345)
    (Jackeline,306)
    (Joseph,330)
    (Juan,256)
    (Jimmy,308)
    (Cory,260)

    */


    //groupByData
    val groupByData = pairStudentSubjectScore.groupByKey();
    groupByData.foreach(println)
    /* groupByData 결과
        (Tina,CompactBuffer(78, 73, 68, 87))
      (Juan,CompactBuffer(63, 69, 64, 60))
      (Jimmy,CompactBuffer(69, 62, 97, 80))
      (Cory,CompactBuffer(56, 65, 71, 68))
      (Thomas,CompactBuffer(87, 93, 91, 74))
      (Jackeline,CompactBuffer(86, 62, 75, 83))
      (Joseph,CompactBuffer(83, 74, 91, 82))
    */

    val groupByDataSum = groupByData.map(value=>( value._1,value._2.sum))

    groupByDataSum.foreach(println)
        /* groupByDataSum 결과
        (Juan,256)
      (Thomas,345)
      (Jackeline,306)
      (Joseph,330)
      (Jimmy,308)
      (Cory,260)
      (Tina,306)
        */

    sparkSession.close();

  }

}

요약
ReduceByKey는 셔플 전에 데이터를 결합하므로 GroupByKey에 비해 네트워크를 통해 전송되는 데이터의 양을 최소화합니다. GroupByKey는 대부분 디스크 부족 문제의 원인입니다 .

참고

  1. https://thebook.io/006908/part01/ch04/03/03/01/

  2. https://intellipaat.com/community/7677/spark-difference-between-reducebykey-vs-groupbykey-vs-aggregatebykey-vs-combinebykey

  3. https://www.streamhub.co.uk/apache-spark-tuning-manual/?fbclid=IwAR3xe6qYN0DN2FIZo0sdoPr1UjLYIL9QofCtgzcDcEAHGTZc5KOiTTy-ASs

DAG : 방향성 비순환 그래프

방향성은 있으니 비순환인 그래프

Spark에서는 DAG를 사용하여 프로그램을 스케줄링 하고 최적화 한다.

하둡 MapReduce와 Spark 사이의 차이를 정리

우선 Spark 에서의 DAG는 vertices (노드) 와 edges(선) 으로 이루어져있다.

Edge들은 동작을 나타내는 transform이 있다.


DAG : https://helloino.tistory.com/94

Spark의 DAG를 통해서 사용자는 스테이지 에서 세부적으로 확장이 가능하다

각각의 스테이지로 구성되는 작업을 RDD의 파티션에 기초해서 병렬로 동일한 연산을 수행한다.

HDFS같은 경우에는 데이터를 읽고 작업을 나누는 map과 다시 결합하는 reduce작업을 거치고 다시 hdfs에 기록이 된다. 이러한 경우에는 hdfs의 메모리 또는 디스크 메모리가 낭비된다.

Spark에서는 연속계산단계인 DAG가 형성된다. 이렇게 하나의 스테이지처리가 끝나면 셔플링 데이터를 최소화 하기 위해 실행 계획을 최적화 한다.

DAG 스케줄러는 운영자를 작업 단계로 나눈다. 스테이지에는 입력 데이터의 파티션을 기반으로 작업을 진행한다.

Spark가 lazy evaluation으로 수행이 되어 stage별로 실행이 되는데 이유는 action이 시작되는 시점에서 transformation 끼리의 연계를 파악해 실행 계획의 최적화가 이루어진다. 여기서 최적화는 대부분 지역성(Locality)에 관한 것이다. 
 이전의 MapReduce 같은 경우에도 map과 reduce연산 중간에 동작하는 셔플은 데이터를 전달하는데 많은 비용이 든다. 이것을 줄이기 위해 많이 신경 써야했지만, Spark는 map filter 등 Narrow transformation같은 경우에는 해당 partition에서 실행되므로 효율적인 계획을 세워서 수행을 한다.

Lineage란 ??

여기 까지 하면 transformation 밖에 없기 때문에 만들어지지는 않고 lineage라는 형태로 rdd가 형성된다. 이것이 Lineage : logical plan 이다.

모든 상위의 RDD가 파생된 RDD에 연결되는 방법을 나타내는 그래프

RDD . toDebugString 메서드로 알 수 있다.

예제

scala> val rdd1 = sc.parallelize(1 to 9)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = rdd1.filter(x => x==5)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:25

scala> val rdd3 = rdd2.map(x=>(x,x))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:25

scala> rdd3.toDebugString
res1: String =
(8) MapPartitionsRDD[2] at map at <console>:25 []
 |  MapPartitionsRDD[1] at filter at <console>:25 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:24


What is difference between lineage and DAG ?

dag는 physical plan 이다 . 스테이지는 또다른 어떤 스테이지에 의존하는지 에 대한 실질적인 처리 를 나타낸다.

실질적으로 Spark가 처리하는 물리적인 계획은 DAG에 그려진다.


예제) https://www.youtube.com/watch?v=NGOD7JN6azM

Spark DAG: https://data-flair.training/blogs/dag-in-apache-spark/

Spark 이점 : https://brocess.tistory.com/104

+ Recent posts