sparkSQL을 사용하면 손쉽게 타입 변환을 할 수 있다.

문자열 -> INT

sparkSession.sql("select INT('23')").show(1);
결과 : 
+---------------+
|CAST(23 AS INT)|
+---------------+
|             23|
+---------------+

하지만 INT( '가나다라') 일 때는 오류를 반환하는 것이 아니라 null 로 반환을 한다.

null 로 반환하는 것이 아니라 오류를 반환 하고 싶으면 아래와 같이 사용하면된다.

SparkSession에 udf (User Define Function : 사용자 지정함수)를 만들고 등록한다.

//SparkSession에 UDF를 정의 하고 등록
sparkSession.udf().register("NumberCasting", (String str)->Integer.parseInt(str), DataTypes.IntegerType);

사용 방법은 기존의 sql 처럼 사용하면된다.

//사용 방법
sparkSession.sql("select NumberCasting('가나다라') ").show()

spark sql 의 udf를 사용하게 되면 null 값이 아닌 오류를 출력 할 수 있다.

...
Caused by: java.lang.NumberFormatException: For input string: "가나다라"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)

사용자정의 함수 만들어서 등록하기 !!

UDFRegistration.scala 에 등록되어있는 register 함수 설명

첫 번째 : 파라미터는 새롭게 만들 udf의 이름

두 번째 : value를 어떻게 변환할것인가의 사용자 정의 코드 (java 에서는 functional인터페이스 사용 가능)

세 번째 : 반환될 데이터 타입

def register(name: String, f: UDF1[_, _], returnType: DataType): Unit = {
    val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any)
    def builder(e: Seq[Expression]) = if (e.length == 1) {
      ScalaUDF(func, returnType, e, e.map(_ => true), udfName = Some(name))
    } else {
      throw new AnalysisException("Invalid number of arguments for function " + name +
        ". Expected: 1; Found: " + e.length)
    }
    functionRegistry.createOrReplaceTempFunction(name, builder)
  }

'BackEnd > Spark' 카테고리의 다른 글

SparkSession, SparkContext, SQLContext, HiveContext  (0) 2020.03.15
SparkSession newSession() 테스트  (0) 2020.03.15
Spark SQL API  (0) 2020.03.09
cleaned accumulator [number]  (0) 2020.03.09
SparkUI 에러 InjectionManagerFactory not found  (0) 2020.03.06

+ Recent posts