Spring batch를 Parallel로 돌려보자

Monolithic 아키텍처 환경에서 가장 잘 돌아가는 어플리케이션 가운데 하나가 배치 작업이다. 모든 데이터와 처리 로직들이 한군데에 모여있기 때문에 최소한의 비용으로 빠르게 기능을 돌릴 수 있다. 데이터 존재하는 Big Database에 접근하거나 Super Application Server에 해당 기능의 수행을 요청하면 된다. 끝!!!

하지만 요즘의 우리가 개발하는 어플리케이션들은 R&R이 끝없이 분리된 Microservices 아키텍처의 세상에서 숨쉬고 있다. 배치가 실행될려면 이 서비스, 저 서비스에 접근해서 데이터를 얻어야 하고, 얻은 데이터를 다른 서비스의 api endpoint를 호출해서 최종적인 뭔가가 만들어지도록 해야한다.  문제는 시간이다!

마이크로서비스 환경에서 시간이 문제가 되는 요인은 여러가지가 있을 수 있다. 배치는 태생적으로 대용량의 데이터를 가지고 실행한다. 따라서 필요한 데이터를 획득하는게 관건이다. 이 데이터를 빠르게 획득할 수 없다면 배치의 실행 속도는 느려지게 된다. 다들 아는 바와 같이 마이크로서비스 환경이 일이 돌아가는 방식은 Big Logic의 실행이 아니라 여러 시스템으로 나뉘어진 Logic간의 Collaboration이다. 그리고 이 연동은 대부분 RESTful을 기반으로 이뤄진다.

RESTful이란 뭔가? HTTP(S) over TCP를 기반으로 한 웹 통신이다. 웹 통신의 특징은 Connectionless이다. (경우에 따라 Connection oriented) 방식이 있긴 하지만, 이건 아주 특수한 경우에나 해당한다. TCP 통신에서 가장 비용이 많이 들어가는 과정은 Connection setup 비용인데, RESTful api를 이용하는 과정에서는 API Call이 매번 발생할 때마다 계속 연결을 새로 맺어야 한다. (HTTP 헤더를 적절히 제어하면 이를 극복할 수도 있을 것 같지만 개발할 때 이를 일반적으로 적용하지는 않기 때문에 일단 스킵. 하지만 언제고 따로 공부해서 적용해봐야할 아젠다인 것 같기는 하다.)

따라서 Monolithic 환경과 같이 특정 데이터베이스들에 연결을 맺고, 이를 읽어들여 처리하는 방식과는 확연하게 대량 데이터를 처리할 때 명확하게 속도 저하가 발생한다. 그것도 아주 심각하게.

다시 말하지만 배치에서 속도는 생명이다. 그러나 개발자는 마이크로서비스를 사랑한다. 이 괴리를 맞출려면…

  1. 병렬처리를 극대화한다.
  2. 로직을 고쳐서 아예 데이터의 수를 줄인다.

근본적인 처방은 두번째 방법이지만, 시간이 별로 없다면 어쩔 수 없다. 병렬 처리로 실행하는 방법을 쓰는 수밖에…
병렬로 실해시키는 가장 간단한 방법은 ThreadPool이다. Springbatch에서 사용 가능한 TaskExecutor 가운데 병렬 처리를 가능하게 해주는 클래스들이 있다.

  • SimpleAsyncTaskExecutor – 필요에 따라 쓰레드를 생성해서 사용하는 방식이다. 연습용이다. 대규모 병렬 처리에는 비추다.
  • ThreadPoolTaskExecutor – 쓰레드 제어를 위한 몇 가지 설정들을 제공한다. 대표적으로 풀을 구성하는 쓰레드의 개수를 정할 수 있다!!! 이외에도 실행되는 작업이 일정 시간 이상 걸렸을 때 이를 종료시킬 수 있는 기능들도 지원하지만… 이런 속성들의 경우에는 크게 쓸일은 없을 것 같다.

제대로 할려면 ThreadPoolTaskExecutor를 사용하는게 좋을 것 같다. 병렬 처리 가능한 TaskExecutor들은 AsyncTaskExecutor 인터페이스 페이지를 읽어보면 알 수 있다.

@Bean(name = "candidateTaskPool")
public TaskExecutor executor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(CORE_TASK_POOL_SIZE);
    executor.setMaxPoolSize(MAX_TASK_POOL_SIZE);
    return executor;
}

이 메소드 정의를 Job Configuration 객체에 반영하면 된다. ThreadPool을 생성할 때 한가지 팁은 Pool을 @Bean annotation을 이용해서 잡아두는게 훨씬 어플리케이션 운영상에 좋다. 작업을 할 때마다 풀을 다시 생성시키는 것이 Cost가 상당하니 말이다. 어떤 Pool이든 매번 만드는 건 어플리케이션 건강에 해롭다.

전체 배치 코드에 이 부분이 어떻게 녹아들어가는지는 아래 코드에서 볼 수 있다.

@Configuration
@EnableBatchProcessing
    public class CandidateJobConfig {
    public static final int CORE_TASK_POOL_SIZE = 24;
    public static final int MAX_TASK_POOL_SIZE = 128;
    public static final int CHUNK_AND_PAGE_SIZE = 400;

    @Bean(name = "candidateTaskPool")
    public TaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CORE_TASK_POOL_SIZE);
        executor.setMaxPoolSize(MAX_TASK_POOL_SIZE);
        return executor;
    }

    @Bean(name = "candidateStep")
    public Step step(StepBuilderFactory stepBuilderFactory,
                     ItemReader<User> reader,
                     ItemProcessor<User, Candidate> processor,
                     ItemWriter<Candidate> writer) {
        return stepBuilderFactory.get("candidateStep")
                                 .<User, Candidate>chunk(CHUNK_AND_PAGE_SIZE)
                                 .reader(candidateReader)
                                 .processor(candidateProcessor)
                                 .writer(candidateWriter)
                                 .taskExecutor(executor())
                                 .build();
    }

이렇게 하면 간단하다.

하지만 이게 다는 아니다. 이 코드는 다중 쓰레드를 가지고 작업을 병렬로 돌린다. 하지만 이 코드에는 한가지 문제점이 있다. 살펴보면 Chunk라는 단위로 작업이 실행된다는 것을 알 수 있다. Chunk는 데이터를 한개씩 읽는게 아니라 한꺼번에 여러 개(이 예제에서는 CHUNK_AND_PAGE_SIZE)씩 읽어 processor를 통해 실행한다. 배치의 실제 구현에 대한 이해나 고려가 필요하다.

Chunk를 사용해서 IO의 효율성을 높이는 방법은 흔하게 사용되는 방법이다. 하지만 입력 데이터를 Serialized된 형태로 읽어들여야 하는 경우라면 좀 더 고려가 필요하다. MultiThread 방식으로 배치가 실행되면 각 쓰레드들은 자신의 Chunk를 채우기 위해서 Reader를 호출한다. 만약 한번에 해당 Chunk가 채워지지 않으면 추가적인 데이터를 Reader에게 요청한다. 이 과정에서 쓰레드간 Race condition이 발생하고, 결국 읽는 과정에서 오류가 발생될 수 있다. 예를 들어 입력으로 단순 Stream Reader 혹은 RDBMS의 Cursor를 이용하는 경우에는.

문제가 된 케이스에서는 JDBCCursorItemReader를 써서 Reader를 구현하였다. 당연히 멀티 쓰레드 환경에 걸맞는 Synchronization이 없었기 때문에 Cursor의 내부 상태가 뒤죽박죽되어 Exception을 유발시켰다.

가장 간단한 해결 방법은 한번 읽어들일 때 Chunk의 크기와 동일한 크기의 데이터를 읽어들이도록 하는 방법이다. Tricky하지만 상당히 효율적인 방법이다.  Cursor가 DBMS Connection에 의존적이기 때문에 개별 쓰레드가 연결을 따로 맺어서 Cursor를 관리하는 것도 다른 방법일 수 있겠지만, 이러면 좀 많이 복잡해진다. ㅠㅠ 동일한 크기의 데이터를 읽어들이기 위해 Paging 방식으로 데이터를 읽어들일 수 있는 JDBCPagingItemReader 를 사용한다. 관련된 샘플은 다음 두개의 링크를 참고하면 쉽게 적용할 수 있다.

이걸 바탕으로 구현한 예제 Reader 코드는 아래와 같다. 이전에 작성한 코드는 이런 모양이다.

Before

public JdbcCursorItemReader<User> itemReader(DataSource auditDataSource,
                                                @Value("#{jobExecutionContext['oldDate']}") final Date oldDate) {
    JdbcCursorItemReader<User> reader = new JdbcCursorItemReader();
    String sql = "SELECT user_name, last_login_date FROM user WHERE last_login_date < '%s'";
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    reader.setSql(String.format(sql, sdf.format(elevenMonthAgoDate)));
    reader.setDataSource(auditDataSource);
    ...
}

After

public JdbcPagingItemReader<User> itemReader(DataSource auditDataSource,
                                             @Value("#{jobExecutionContext['oldDate']}") final Date oldDate) {
    JdbcPagingItemReader<User> reader = new JdbcPagingItemReader();

    SqlPagingQueryProviderFactoryBean factory = new SqlPagingQueryProviderFactoryBean();
    factory.setDataSource(auditDataSource);
    factory.setSelectClause("SELECT user_name, last_login_date ");
    factory.setFromClause("FROM user ");
    factory.setWhereClause(String.format("WHERE last_login_date < '%s'", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(oldDate)));
    factory.setSortKey("user_name");

    reader.setQueryProvider(factory.getObject());
    reader.setDataSource(auditDataSource);
    reader.setPageSize(CHUNK_AND_PAGE_SIZE);

    ...
}

여기에서 가장 핵심은 CHUNK_AND_PAGE_SIZE라는 Constant다. 이름에서 풍기는 의미를 대강 짐작하겠지만, Chunk에서 읽어들이는 값과 한 페이지에서 읽어들이는 개수가 같아야 한다는 것이다. 이러면 단일 Cursor를 사용하더라도 실행 Thread간의 경합 문제없이 간단히 문제를 잡을 수 있다. 하지만 명심할 건 이것도 뽀록이라는 사실.
이렇게 문제는 해결했지만 과연 마이크로서비스 환경의 배치로서 올바른 모습인가에 대해서는 의구심이 든다. 기존의 배치는 Monolithic 환경에서 개발되었고, 가급적 손을 덜 들이는 관점에서 접근하고 싶어서 쓰레드를 대량으로 투입해서 문제를 해결하긴 했다. 젠킨스를 활용하고, 별도의 어플리케이션 서버를 만들어서 구축한 시스템적인 접근 방법이 그닥 구린건 아니지만 태생적으로 다음의 문제점들이 있다는 생각이 작업중에 들었다.

  • SpringBatch가 기존의 주먹구구식 배치를 구조화시켜서 이쁘게 만든건 인정한다. 하지만 개별 서버의 한계를 넘어서지는 못했다.  한대의 장비라는 한계. 이게 문제다. 성능이 아무리 좋다고 하더라도 한대 장비에서 커버할 수 있는 동시 작업의 한계는 분명하다. 더구나 안쓰고 있는데도 장비가 물려있어야 하니 이것도 좀 낭비인 것 같기도 하고…
  • Microservice 환경의 Transaction cost는 기존 Monolithic에 비해 감내하기 힘든 Lagging effect를 유발시킨다. 그렇다고 개별 개별 서비스의 DB 혹은 Repository를 헤집으면서 데이터를 처리하는건 서비스간의 Indepedency를 유지해야한다는 철학과는 완전 상반된 짓이다. 구린 냄새를 풍기면서까지 이런 짓을 하고 싶지는 않다. Asynchronous하고, Parallelism을 충분히 지원할 수 있는 배치 구조가 필요하다.

한번에 다 할 수 없고, 일단 생각꺼리만 던져둔다. 화두를 던져두면 언젠가는 이야기할 수 있을거고, 재대로 된 방식을 직접 해보거나 대안제를 찾을 수 있겠지.