sparkSQL을 사용하면 손쉽게 타입 변환을 할 수 있다.
문자열 -> INT
sparkSession.sql("select INT('23')").show(1);
결과 :
+---------------+
|CAST(23 AS INT)|
+---------------+
| 23|
+---------------+
하지만 INT( '가나다라') 일 때는 오류를 반환하는 것이 아니라 null 로 반환을 한다.
null 로 반환하는 것이 아니라 오류를 반환 하고 싶으면 아래와 같이 사용하면된다.
SparkSession에 udf (User Define Function : 사용자 지정함수)를 만들고 등록한다.
//SparkSession에 UDF를 정의 하고 등록
sparkSession.udf().register("NumberCasting", (String str)->Integer.parseInt(str), DataTypes.IntegerType);
사용 방법은 기존의 sql 처럼 사용하면된다.
//사용 방법
sparkSession.sql("select NumberCasting('가나다라') ").show()
spark sql 의 udf를 사용하게 되면 null 값이 아닌 오류를 출력 할 수 있다.
...
Caused by: java.lang.NumberFormatException: For input string: "가나다라"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
사용자정의 함수 만들어서 등록하기 !!
UDFRegistration.scala 에 등록되어있는 register 함수 설명
첫 번째 : 파라미터는 새롭게 만들 udf의 이름
두 번째 : value를 어떻게 변환할것인가의 사용자 정의 코드 (java 에서는 functional인터페이스 사용 가능)
세 번째 : 반환될 데이터 타입
def register(name: String, f: UDF1[_, _], returnType: DataType): Unit = {
val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any)
def builder(e: Seq[Expression]) = if (e.length == 1) {
ScalaUDF(func, returnType, e, e.map(_ => true), udfName = Some(name))
} else {
throw new AnalysisException("Invalid number of arguments for function " + name +
". Expected: 1; Found: " + e.length)
}
functionRegistry.createOrReplaceTempFunction(name, builder)
}
'BackEnd > Spark' 카테고리의 다른 글
SparkSession, SparkContext, SQLContext, HiveContext (0) | 2020.03.15 |
---|---|
SparkSession newSession() 테스트 (0) | 2020.03.15 |
Spark SQL API (0) | 2020.03.09 |
cleaned accumulator [number] (0) | 2020.03.09 |
SparkUI 에러 InjectionManagerFactory not found (0) | 2020.03.06 |