SparkSQL JDBC

read.format("jdbc") 와 read.jdbc 의 차이는 뭘까 ?

결과 : 불러오는 것에 대한 코드는 같지만, format을 통해 다양한 옵션을 사용해서 JDBC에서 데이터를 불러오는 성능을 향상 시킬수 있다.

// Loading data from a JDBC source
//전체 데이터를 나눠서 사용할수 있는 옵션 설정 가능
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
//전체 데이터를 한번에
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)


// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

데이터브릭스의 홈페이지에서 이유를 알수 있었다.

Push down a query to the database engine

You can push down an entire query to the database and return just the result. The table parameter identifies the JDBC table to read. You can use anything that is valid in a SQL query FROM clause.

Scala

// Note: The parentheses are required.
val pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
val df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

spark.read.jdbc 의 메서드를 사용하면, 전체 데이터를 가지고 오는 경우가 된다.

해당 쿼리를 하나의 Executor에서 DBMS로 쿼리를 날려서 갖고오게 되는데, 이 경우 메모리가 부족하면 GC가 발생할 수 있다.

그래서 더 좋은 방법은 spark.read.format("jdbc").option("key","value").load() 를 하는 방법도 있다. 하지만 파티션을 지정하지 않는다면 JDBC의 단일 쿼리를 사용해서 하나의 파티션에 모든 데이터를 가지고 온다. 그렇기 때문에 파티션을 설정하고 executor에 나눠서 데이터를 가지고 오는것을 추천한다.

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
    .option("partitionColumn",)
    .option("lowerBound",1)
    .option("upperBound",10000)
    .option("numberPartitions",5)
  .load()

주요 옵션 설정

partitionColumn : 데이터 집합을 분할할때 사용되는 열의 이름

lowerBound : 읽어올 최소값

upperBound : 읽을 최대값

numberPartitions : 파티션의 수

fetchSize : 원격의 데이터베이스에서 fetch되는 행 수를 제어하는 매개변수
기본 값에서 약간만 늘리면 성능이 크게 향상 될 수 있다.

값을 너무 낮게 하면 전체결과 집합을 가지고 오기 위해서 외부 데이터베이스 간에 많은 요청으로 작업부하에 지연이됨.

너무 많은 요청을 DBMS에서 핸들링 못하는 것에 대한 에러가 발생 할 수 있다.

https://docs.databricks.com/data/data-sources/sql-databases.html

https://medium.com/@radek.strnad/tips-for-using-jdbc-in-apache-spark-sql-396ea7b2e3d3

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-DataFrameReader.html

Spark Jersey dependency 충돌

이슈 : 2020-03-23 16:21:03.349 [SparkUI-28] WARN o.s.jetty.servlet.ServletHandler - Error for /api/v1/applications
java.lang.NoSuchFieldError: INCLUDE_ALL

Jersey 라이브러리와 충돌이 나서 발생했던것

2020-03-23 16:21:03.349 [SparkUI-28] WARN  o.s.jetty.servlet.ServletHandler - Error for /api/v1/applications
java.lang.NoSuchFieldError: INCLUDE_ALL
    at org.glassfish.jersey.server.ResourceConfig$State.<init>(ResourceConfig.java:114)
    at org.glassfish.jersey.server.ResourceConfig.<init>(ResourceConfig.java:356)
    at org.glassfish.jersey.servlet.WebComponent.createResourceConfig(WebComponent.java:578)
    at org.glassfish.jersey.servlet.WebComponent.<init>(WebComponent.java:356)
    at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:177)
    at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:369)
    at javax.servlet.GenericServlet.init(GenericServlet.java:244)
    at org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:643)
    at org.spark_project.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:499)
    at org.spark_project.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:791)
    at org.spark_project.jetty.servlet.ServletHolder.prepare(ServletHolder.java:776)
    at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:580)
    at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
    at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:513)
    at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
    at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
    at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
    at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
    at org.spark_project.jetty.server.Server.handle(Server.java:539)
    at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:333)
    at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
    at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
    at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:108)
    at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
    at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
    at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
    at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
    at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
    at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
    at java.lang.Thread.run(Thread.java:748)

해결 방법

기존의 build.gradle에 Jetty를 별도로추가해서 충돌이 일어나 발생하는것 같다.

예전 gradle.build

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    implementation 'org.apache.spark:spark-core_2.11:2.4.5'
    implementation 'org.apache.spark:spark-sql_2.11:2.4.5'
    implementation 'org.apache.spark:spark-hive_2.11:2.4.5'
    implementation 'org.codehaus.janino:janino:3.0.8'
    implementation 'org.slf4j:integration:1.7.29'
    implementation 'ch.qos.logback:logback-classic:1.2.3'
    implementation 'org.postgresql:postgresql:42.2.10.jre7'
    //요부분 jersey
    implementation 'org.glassfish.jersey.inject:jersey-hk2:2.30'

    implementation 'com.google.code.gson:gson:2.8.6'
    implementation 'org.projectlombok:lombok:1.18.10'
    annotationProcessor 'org.projectlombok:lombok:1.18.10'
}

해결한 gradle.build

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    implementation 'org.apache.spark:spark-core_2.11:2.4.5'
    implementation 'org.apache.spark:spark-sql_2.11:2.4.5'
    implementation 'org.apache.spark:spark-hive_2.11:2.4.5'
    implementation 'org.codehaus.janino:janino:3.0.8'

    implementation 'org.slf4j:integration:1.7.29'
    implementation 'ch.qos.logback:logback-classic:1.2.3'
    compile 'org.postgresql:postgresql:42.2.10.jre7'

    implementation 'com.google.code.gson:gson:2.8.6'
    implementation 'org.projectlombok:lombok:1.18.10'
    annotationProcessor 'org.projectlombok:lombok:1.18.10'
}

log 로 살펴본 Spark 동작 과정

spark가 실행되는 과정을 로그를 통해서 살펴 보는것도 의미가 있다고 생각해서 진행하고자 합니다.

시간 날때마다 정리를 해서 업로드 할 예정입니다.

코드는 아래와 같다.

val sparkSession = SparkUtil.getInstance();

//Load RDD
val readme = sparkSession.sparkContext.textFile("./data/README.md");
readme.toDebugString;
println(readme.count());

sparkSession.stop()

아래는 위 코드의 SparkLog 입니다.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/03/22 16:50:39 INFO SparkContext: Running Spark version 2.4.3
20/03/22 16:50:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/03/22 16:50:40 INFO SparkContext: Submitted application: Spark Basic Test
20/03/22 16:50:40 INFO SecurityManager: Changing view acls to: daeyunkim
20/03/22 16:50:40 INFO SecurityManager: Changing modify acls to: daeyunkim
20/03/22 16:50:40 INFO SecurityManager: Changing view acls groups to: 
20/03/22 16:50:40 INFO SecurityManager: Changing modify acls groups to: 
20/03/22 16:50:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(daeyunkim); groups with view permissions: Set(); users  with modify permissions: Set(daeyunkim); groups with modify permissions: Set()
20/03/22 16:50:40 INFO Utils: Successfully started service 'sparkDriver' on port 52540.
20/03/22 16:50:40 INFO SparkEnv: Registering MapOutputTracker
20/03/22 16:50:40 INFO SparkEnv: Registering BlockManagerMaster
20/03/22 16:50:40 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/03/22 16:50:40 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/03/22 16:50:40 INFO DiskBlockManager: Created local directory at /private/var/folders/qx/84bp85pn2y5gn6_96k267psc0000gn/T/blockmgr-fa3cf378-4049-4c69-8c24-7eaddb7af53e
20/03/22 16:50:40 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
20/03/22 16:50:40 INFO SparkEnv: Registering OutputCommitCoordinator
20/03/22 16:50:40 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/03/22 16:50:40 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.30.1.58:4040
20/03/22 16:50:40 INFO Executor: Starting executor ID driver on host localhost
20/03/22 16:50:40 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52541.
20/03/22 16:50:40 INFO NettyBlockTransferService: Server created on 172.30.1.58:52541
20/03/22 16:50:40 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/03/22 16:50:41 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.30.1.58, 52541, None)
20/03/22 16:50:41 INFO BlockManagerMasterEndpoint: Registering block manager 172.30.1.58:52541 with 2004.6 MB RAM, BlockManagerId(driver, 172.30.1.58, 52541, None)
20/03/22 16:50:41 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.30.1.58, 52541, None)
20/03/22 16:50:41 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.30.1.58, 52541, None)
20/03/22 16:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 214.6 KB, free 2004.4 MB)
20/03/22 16:50:42 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.4 KB, free 2004.4 MB)
20/03/22 16:50:42 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.30.1.58:52541 (size: 20.4 KB, free: 2004.6 MB)
20/03/22 16:50:42 INFO SparkContext: Created broadcast 0 from textFile at TestInternalRDD.scala:12
20/03/22 16:50:42 INFO FileInputFormat: Total input paths to process : 1
20/03/22 16:50:42 INFO SparkContext: Starting job: count at TestInternalRDD.scala:14
20/03/22 16:50:42 INFO DAGScheduler: Got job 0 (count at TestInternalRDD.scala:14) with 2 output partitions
20/03/22 16:50:42 INFO DAGScheduler: Final stage: ResultStage 0 (count at TestInternalRDD.scala:14)
20/03/22 16:50:42 INFO DAGScheduler: Parents of final stage: List()
20/03/22 16:50:42 INFO DAGScheduler: Missing parents: List()
20/03/22 16:50:42 INFO DAGScheduler: Submitting ResultStage 0 (./data/README.md MapPartitionsRDD[1] at textFile at TestInternalRDD.scala:12), which has no missing parents
20/03/22 16:50:42 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.3 KB, free 2004.4 MB)
20/03/22 16:50:42 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2023.0 B, free 2004.4 MB)
20/03/22 16:50:42 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.30.1.58:52541 (size: 2023.0 B, free: 2004.6 MB)
20/03/22 16:50:42 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1161
20/03/22 16:50:42 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (./data/README.md MapPartitionsRDD[1] at textFile at TestInternalRDD.scala:12) (first 15 tasks are for partitions Vector(0, 1))
20/03/22 16:50:42 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
20/03/22 16:50:42 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7932 bytes)
20/03/22 16:50:42 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7932 bytes)
20/03/22 16:50:42 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
20/03/22 16:50:42 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
20/03/22 16:50:42 INFO HadoopRDD: Input split: file:/Users/daeyunkim/Documents/SparkCode/BasicSparkScala/data/README.md:0+2243
20/03/22 16:50:42 INFO HadoopRDD: Input split: file:/Users/daeyunkim/Documents/SparkCode/BasicSparkScala/data/README.md:2243+2244
20/03/22 16:50:42 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 875 bytes result sent to driver
20/03/22 16:50:42 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 875 bytes result sent to driver
20/03/22 16:50:42 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 141 ms on localhost (executor driver) (1/2)
20/03/22 16:50:42 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 122 ms on localhost (executor driver) (2/2)
20/03/22 16:50:42 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
20/03/22 16:50:42 INFO DAGScheduler: ResultStage 0 (count at TestInternalRDD.scala:14) finished in 0.238 s
20/03/22 16:50:42 INFO DAGScheduler: Job 0 finished: count at TestInternalRDD.scala:14, took 0.316393 s
108
20/03/22 16:50:42 INFO SparkUI: Stopped Spark web UI at http://172.30.1.58:4040
20/03/22 16:50:42 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/03/22 16:50:42 INFO MemoryStore: MemoryStore cleared
20/03/22 16:50:42 INFO BlockManager: BlockManager stopped
20/03/22 16:50:42 INFO BlockManagerMaster: BlockManagerMaster stopped
20/03/22 16:50:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/03/22 16:50:42 INFO SparkContext: Successfully stopped SparkContext
20/03/22 16:50:42 INFO ShutdownHookManager: Shutdown hook called
20/03/22 16:50:42 INFO ShutdownHookManager: Deleting directory /private/var/folders/qx/84bp85pn2y5gn6_96k267psc0000gn/T/spark-de3a49e8-e75a-4c6f-8013-a6829507e0f3

+ Recent posts