SparkSQL JDBC

read.format("jdbc") 와 read.jdbc 의 차이는 뭘까 ?

결과 : 불러오는 것에 대한 코드는 같지만, format을 통해 다양한 옵션을 사용해서 JDBC에서 데이터를 불러오는 성능을 향상 시킬수 있다.

// Loading data from a JDBC source
//전체 데이터를 나눠서 사용할수 있는 옵션 설정 가능
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
//전체 데이터를 한번에
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)


// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

데이터브릭스의 홈페이지에서 이유를 알수 있었다.

Push down a query to the database engine

You can push down an entire query to the database and return just the result. The table parameter identifies the JDBC table to read. You can use anything that is valid in a SQL query FROM clause.

Scala

// Note: The parentheses are required.
val pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
val df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

spark.read.jdbc 의 메서드를 사용하면, 전체 데이터를 가지고 오는 경우가 된다.

해당 쿼리를 하나의 Executor에서 DBMS로 쿼리를 날려서 갖고오게 되는데, 이 경우 메모리가 부족하면 GC가 발생할 수 있다.

그래서 더 좋은 방법은 spark.read.format("jdbc").option("key","value").load() 를 하는 방법도 있다. 하지만 파티션을 지정하지 않는다면 JDBC의 단일 쿼리를 사용해서 하나의 파티션에 모든 데이터를 가지고 온다. 그렇기 때문에 파티션을 설정하고 executor에 나눠서 데이터를 가지고 오는것을 추천한다.

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
    .option("partitionColumn",)
    .option("lowerBound",1)
    .option("upperBound",10000)
    .option("numberPartitions",5)
  .load()

주요 옵션 설정

partitionColumn : 데이터 집합을 분할할때 사용되는 열의 이름

lowerBound : 읽어올 최소값

upperBound : 읽을 최대값

numberPartitions : 파티션의 수

fetchSize : 원격의 데이터베이스에서 fetch되는 행 수를 제어하는 매개변수
기본 값에서 약간만 늘리면 성능이 크게 향상 될 수 있다.

값을 너무 낮게 하면 전체결과 집합을 가지고 오기 위해서 외부 데이터베이스 간에 많은 요청으로 작업부하에 지연이됨.

너무 많은 요청을 DBMS에서 핸들링 못하는 것에 대한 에러가 발생 할 수 있다.

https://docs.databricks.com/data/data-sources/sql-databases.html

https://medium.com/@radek.strnad/tips-for-using-jdbc-in-apache-spark-sql-396ea7b2e3d3

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-DataFrameReader.html

+ Recent posts