이전에 글인 https://deviscreen.tistory.com/78?category=789556 에서 SparkSession을 사용하여 멀티 세션을 설명 했는데 더 자세한 테스트를 진행했다.


추가 테스트

SparkSession을 처음 생성하고 이후에 newSession을 만들었다.

디버깅으로 session는 어떤 값들을 가지고 있는지 테스트 목적

SparkSession session = new SparkUtil().getSparkSession();
SparkSession newSession= session.newSession();

디버깅하면서 코드를 확인한 결과 session과 newSession는 다른 주소 값을 가지고 있다.

session = {SparkSession@4254} 
newSession = {SparkSession@4772} 

session의 값

session = {SparkSession@4254} 
 sparkContext = {SparkContext@4255} 
 existingSharedState = {None$@4256} "None"
 parentSessionState = {None$@4256} "None"
 extensions = {SparkSessionExtensions@4257} 
 creationSite = {CallSite@4258} "CallSite(getOrCreate at SparkUtil.java:13,org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:957)\nSparkUtil.<init>(SparkUtil.java:13)\nSparkMain.main(SparkMain.java:6))"
 sharedState = {SharedState@4767} 
 initialSessionOptions = {HashMap@4259} "HashMap" size = 4
 sessionState = {SessionState@4768} 
 sqlContext = {SQLContext@4261} 

newSession 의 값

newSession = {SparkSession@4772} 
 sparkContext = {SparkContext@4255} 
 existingSharedState = {Some@4774} "Some(org.apache.spark.sql.internal.SharedState@2c1d57bc)"
 parentSessionState = {None$@4256} "None"
 extensions = {SparkSessionExtensions@4257} 
 creationSite = {CallSite@4775} "CallSite(newSession at SparkMain.java:10,org.apache.spark.sql.SparkSession.newSession(SparkSession.scala:240)\nSparkMain.main(SparkMain.java:10))"
 sharedState = null
 initialSessionOptions = {HashMap@4776} "HashMap" size = 0
 sessionState = null
 sqlContext = {SQLContext@4777} 

SparkContext는 같은 값을 가지고 있지만 sqlContext는 다른 값을 가지고 있다.

이것을 보면 SparkSession의 newSession( ) 메서드는 개별의 독립된 sqlContext를 구성한다. 그리고 기본 SparkContext및 캐시된 데이터를 공유하면서 세션을 시작 할 수 있다.

'BackEnd > Spark' 카테고리의 다른 글

Spark Directed acyclic graph, lineage  (0) 2020.03.17
SparkSession, SparkContext, SQLContext, HiveContext  (0) 2020.03.15
SparkSQL Casting 타입 변환, 오류 검출  (0) 2020.03.11
Spark SQL API  (0) 2020.03.09
cleaned accumulator [number]  (0) 2020.03.09

sparkSQL을 사용하면 손쉽게 타입 변환을 할 수 있다.

문자열 -> INT

sparkSession.sql("select INT('23')").show(1);
결과 : 
+---------------+
|CAST(23 AS INT)|
+---------------+
|             23|
+---------------+

하지만 INT( '가나다라') 일 때는 오류를 반환하는 것이 아니라 null 로 반환을 한다.

null 로 반환하는 것이 아니라 오류를 반환 하고 싶으면 아래와 같이 사용하면된다.

SparkSession에 udf (User Define Function : 사용자 지정함수)를 만들고 등록한다.

//SparkSession에 UDF를 정의 하고 등록
sparkSession.udf().register("NumberCasting", (String str)->Integer.parseInt(str), DataTypes.IntegerType);

사용 방법은 기존의 sql 처럼 사용하면된다.

//사용 방법
sparkSession.sql("select NumberCasting('가나다라') ").show()

spark sql 의 udf를 사용하게 되면 null 값이 아닌 오류를 출력 할 수 있다.

...
Caused by: java.lang.NumberFormatException: For input string: "가나다라"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)

사용자정의 함수 만들어서 등록하기 !!

UDFRegistration.scala 에 등록되어있는 register 함수 설명

첫 번째 : 파라미터는 새롭게 만들 udf의 이름

두 번째 : value를 어떻게 변환할것인가의 사용자 정의 코드 (java 에서는 functional인터페이스 사용 가능)

세 번째 : 반환될 데이터 타입

def register(name: String, f: UDF1[_, _], returnType: DataType): Unit = {
    val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any)
    def builder(e: Seq[Expression]) = if (e.length == 1) {
      ScalaUDF(func, returnType, e, e.map(_ => true), udfName = Some(name))
    } else {
      throw new AnalysisException("Invalid number of arguments for function " + name +
        ". Expected: 1; Found: " + e.length)
    }
    functionRegistry.createOrReplaceTempFunction(name, builder)
  }

'BackEnd > Spark' 카테고리의 다른 글

SparkSession, SparkContext, SQLContext, HiveContext  (0) 2020.03.15
SparkSession newSession() 테스트  (0) 2020.03.15
Spark SQL API  (0) 2020.03.09
cleaned accumulator [number]  (0) 2020.03.09
SparkUI 에러 InjectionManagerFactory not found  (0) 2020.03.06

Eclipse -No tests found with test runner JUnit 5

Junit 4로 로 테스트를 진행하는데 계속

No tests found with test runner JUnit 5 로 에러가 발생했다.

이럴때 해결하는 방법은

run configurations > JUnit > Test > 아래 Test runner 설정 > JUnit4로 변경을 해주면된다.

그리고 JUnit4 에서는 @Test 실행하려는 메소드의 접근 지정자는 꼭 public 이어야한다.

아래의 예제 대로 실행을 하면 Before, test,After 가 순차적으로 진행이 되어야한다.

public class BeforeAfter {

    @Before
    public void SetUp(){
        System.out.println("Before");
    }

    @Test
    void transformation() {
        System.out.println("test");
    }
    @After
    public void after(){
        System.out.println("After");
    }

}

하지만 test 밖에 출력이 되지 않았다. 왜 그런걸까 찾아보니 JUnit5 에서는 @Before, @After 가 @BeforeEach, @AfterEach 로 설정을 해야한다.

JUnit5 의 어노테이션으로 아래의 코드로 다시 실행을 하니 다시 실행이 된다.

public class BeforeAfter {

    @BeforeEach
    public void SetUp(){
        System.out.println("Before");
    }

    @Test
    void transformation() {
        System.out.println("test");
    }
    @AfterEach
    public void after(){
        System.out.println("After");
    }

}

SpringBoot에서 JUnit이 몇버전인지 알고 싶을때 아래로 확인하고 변경하면 된다.

생각없이 그냥 springInitializer에서 다운을 받으면 아래와 같은 설정이 있는데, springboot-2.2.X 이후부터는 Junit5로 설정이 되어있는 것 같다.

    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }

https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-2.2-Release-Notes

spark를 실행 하고 있으면 cleaned accumulator 숫자

INFO org.apache.spark.ContextCleaner - Cleaned accumulator 126 의 형태를 볼수 있다 무엇을 의미 하는 것 일까 ?

 

19:17:36.899 [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 126 19:17:36.899 [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 145 19:17:36.899 [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 152 19:17:36.899 [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 130 19:17:36.899 [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 144 19:17:36.899 [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 142

 

 

org.apache.spark.ContextCleaner 클래스 API 설명

 

RDD, 셔플 및 브로드캐스트 상태를 위한 비동기식 클리너

관련 object가 응용 프로그램 범위를 벗어날대, 처리 될 각 RDD, shuffleDependency 및 관심이 있는 브로드 캐스트에 대한 약한 참조는 유지됩니다.실제로 처리는 별도의 데몬 스레드에서 수행됨.

https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/ContextCleaner.html

 

stackoverflow 답변 중

 

ContextCleaner는 드라이버에서 실행 된다. SparkContext가 시작될때 작성되고 즉시 시작된다. RDD, 셔플 및 브로드캐스트 상태, accumulate를 정리하는 컨텍스트 클리너 스레드(keepCleaning 메소드 사용). context-cleaner-periodic-gc는 JVM 가비지 콜렉터를 요청함.

https://stackoverflow.com/questions/55452892/contextcleaner-cleaned-accumulator-what-does-it-mean-in-scala-spark

Git remote: Permission to

깃허브 사용중에 아래와같은 에러가 발생하는 경우가 있었다.

remote: Permission to elasticsearchstudy/SaturdaySpring.git denied to DaeyunKim.

fatal: unable to access 'https://github.com/elasticsearchstudy/SaturdaySpring.git': The requested URL returned error: 403

처음엔 프로젝트에서 권한 문제인줄알았는데

찾아보니 아래와 같이 다시 터미널에서 입력해주고 다시 시도하니 된다.

$ git remote set-url origin git@github.com:elasticsearchstudy/SaturdaySpring.git

해결 : https://stackoverflow.com/questions/47465644/github-remote-permission-denied

'BackEnd > ETC' 카테고리의 다른 글

[gradle] CreateProcess error=206  (0) 2020.04.25
http 상태 코드  (0) 2020.01.19
CQRS란 ?  (1) 2020.01.19
STORM 정리  (0) 2017.12.18

깃 관리 툴로 오픈소스를 찾다가 fork라는 소프트웨어를 찾았다.  회사와 집에서 편하게 잘 사용했지만.. 이제 조만간 유료로 바뀐다는 내용이 홈페이지에 올라왔다.

 

프로그램을 쓰면서 가장 좋았던 점은 충돌을 해결하는 방법이 간단해서 너무 좋았는데 ㅠㅠ 

아래는 머지할때 충돌나는 부분들을 선택하여 고를수 있게 되어 있다.

 

그래도 잘 사용 했으니 돈을 지불할 의향이 있다.

 

 

https://git-fork.com/

'기타?' 카테고리의 다른 글

Typora Edior 사용기  (1) 2018.05.14

SparkUI 에러 InjectionManagerFactory not found

springframework 에서 spark를 조작하는데 spark UI에서 executor를 실행하면 아래와 같은 오류가 발생한다.

Caused by: java.lang.IllegalStateException: InjectionManagerFactory not found.

    at org.glassfish.jersey.internal.inject.Injections.lambda$lookupInjectionManagerFactory$0(Injections.java:74)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at org.glassfish.jersey.internal.inject.Injections.lookupInjectionManagerFactory(Injections.java:74)
    at org.glassfish.jersey.internal.inject.Injections.createInjectionManager(Injections.java:69)
    at org.glassfish.jersey.server.ApplicationHandler.<init>(ApplicationHandler.java:259)
    at org.glassfish.jersey.servlet.WebComponent.<init>(WebComponent.java:311)
    at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:154)
    at org.glassfish.jersey.servlet.ServletContainer.init(ServletContainer.java:346)
    at javax.servlet.GenericServlet.init(GenericServlet.java:203)
    at org.spark_project.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:643)
    ... 23 common frames omitted

해결방법은 아래의 dependency를 추가해주면된다.

implementation 'org.glassfish.jersey.inject:jersey-hk2:2.30'

'BackEnd > Spark' 카테고리의 다른 글

Spark SQL API  (0) 2020.03.09
cleaned accumulator [number]  (0) 2020.03.09
[Spark] RDD Persistence 와 Caching  (0) 2020.03.05
스파크 SQL 사용하여 파일로 보내기  (0) 2020.03.02
java.io.InvalidClassException: org.apache.spark.rdd.RDD  (0) 2020.02.18


RDD의 persistence, Caching에 대해서 정리

Persistence와 Cache가 무엇인가가?

RDD 를 영구적으로 저장하는 기술로 결과를 즉시 저장해두었다가 이후에 필요하면 다시 불러와서 사용이 가능하다. 이것으로 컴퓨팅오버헤드를 줄일 수 있다.

이럴때 RDD를 저장하는 방법은 cache() 와 persist() 메서드가 있다.

cache() 메서드는 모든 메모리의 모든 RDD를 저장할 수 있다. 그리고 RDD를 메모리에 유지하고 병렬작업에서 효율적으로 사용 할 수 있다.

Cache()와 Persist()의 메서드의 차이는 ?

cache()와 persist()의 차이점 : cache()를 사용하면 기본 저장 장소가 MEMORY_ONRY 이고 persist를 하면 다양한 저장소에 수준을 저장할 수 있다.

RDD의 파티션이 손실되면 원래 생성한 변환작업을 통해 다시 복구한다.

Spark 에서 Persist가 필요한 이유

RDD 와 RDD 에서 호출하는 액션들에 대한 모든 의존성을 재연산하게 되는데, 호출하는 액션들에 대한 모든의존성을 재연산하게 된다.

이때 데이터를 여러번 스캔하는 반복알고리즘들에 대해서는 매우 무거운 작업일 수 있다.

persist 함으로서 반복적인 알고리즘과 메모리의 소비를 줄 일 수 있다.

여러번 반복 연산하는 것을 피하려면 스파크에 데이터 영속화(persist/persistence)를 요청할 수 있다.

RDD 영속화에 대한 요청을 하면 RDD를 계산한 노드들은 그 파티션들을 저장하고 있게 된다.

자바에서는 기본적으로 persist()가 데이터를 JVM 힙(heap) 에 직렬화되지 않는 객체 형태로 저장.

  




레벨  공간사용  CPU 사용시간 메모리에 저장 디스크에 저장 비고
MEMORY_ONLY  높음  낮음  예  아니오  
MEMORY_ONLY_SER    낮음 높음   아니오  
MEMORY_AND_DISK     높음 중간 일부  일부 메모리에 넣기에 데이터가 너무 많으면 디스크에 나눠 저장
MEMORY_AND_DISK_SER     낮음 높음 일부  일부 메모리에 넣기에 데이터가 너무 많으면 디스크에 나눠 저장.메모리에 직렬화된 형태로 저장


DISK_ONLY 낮음 높음 아니오 예

persist() 호출은 연산을 강제로 수행하지않는다.

메모리에 많은 데이터를 올리려고 시도하면 스파크는 LRU 캐시 정책에 따라 오래된 파티션들을 자동으로 버림.

예제 코드

import org.apache.spark.storage.StorageLevel

val result = input map(x => x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))



Dataset<Row> 또한 데이터가 아니라 변환작업들을 저장하고 있어 불러올 때 마다 값이 달라질 수도있는데, 이것을 persist 하면 cached 된 데이터를 사용하기 때문에 다시 계산할 필요없다.

https://data-flair.training/blogs/apache-spark-rdd-persistence-caching/

+ Recent posts