티스토리 뷰

Backend

Linger 로 오버헤드 줄이기

지마켓 선현상 2021. 7. 14. 09:20

Linger

Kafka 에 보면 linger 라는 개념이 나옵니다. 프로듀서가 메세지를 전송할 때, 전송을 위한 통신 오버헤드를 줄이기 위해 메세지를 어느 정도 모아서 보내는 방식입니다. 네트워크 비용은 대체적으로 큰 비용에 속하기 때문에 이를 줄이기 위한 방법입니다. 이 컨셉은 kafka 의 여러 곳에서 등장합니다. 기능만큼이나 성능 측면에서 비용을 낮추기 위해 여러 방안을 사용한 듯 싶습니다.

 

네트워크 비용만큼이나 비싼 비용은 바로 DB 비용입니다. 특히 RDB 에서는 원하는 목표를 위한 큰 비용을 마주하곤 합니다. 도메인의 특이한 제약은 종종 겪곤 하는데 저도 이번 경우에 몇몇 제약적인 상황 하에 RDB 의 비용을 효율적으로 다뤄보기 위해 linger 전략을 도입한 사례를 소개합니다. Linger 는 경쟁 조건을 해결하고 비동기를 다루는 방식이 필요하여 이를 위한 좋은 사례도 되었습니다.

 

비싼 오버헤드

모든 비용을 linger 같은 방식으로 줄일 수 있지는 않을 것입니다. 아마도 오버헤드가 있는 경우 그리고 그 오버헤드가 비싼 경우가 해당될 듯 합니다.

빨간 블럭은 메세지 수행에 필요한 오버헤드이고 노란 블럭은 실제 메세지를 처리하기 위한 수행시간입니다. 예시에서 보듯 3개의 메세지를 처리하는 과정에 이를 모아서 수행하는 것이 각각을 즉시 수행하기보다 좀 더 짧은 수행 시간을 보이는 상황을 이해해볼 수 있습니다.

 

예시에는 이런 방식의 단점들도 확인할 수 있습니다. 모든 메세지의 처리는 마지막 메세지의 처리가 끝나는 시점에 확인할 수 있습니다. 즉, 세 메세지의 평균 처리 시간은 오히려 길어질 수도 있습니다.

 

재고 차감

재고를 차감하는 과정은 다음과 같습니다.

  1. 현재 재고를 확인한다.
  2. 차감하려는 수량이 차감 가능한지 판단한다.
  3. 차감 가능하다면 재고를 차감한다.

이 과정에는 읽기, 판단, 쓰기 가 있으며 재고에 dirty read 를 발생시키지 않으려면 이 행위를 원자적으로 수행해야 합니다. 경쟁 조건 속에서 이 격리 수준을 유지하는 것은 꽤 큰 비용이 발생합니다. 제가 직면한 문제도 바로 이 부분이었습니다.

 

이를 개선하려는 측면에서 경쟁을 줄이고 순차적으로 수행 가능하도록 요청을 모아서 수행한다면 어떨까 하는 생각을 하였습니다.

 

linger 의 요소

먼저, 다건일 때 오버헤드가 줄어드는 처리 방식을 마련합니다. 재고 차감의 과정에서는 원자적으로 수행하되 여러 요청을 순차적으로 처리 가능한 procedure 로 준비하였습니다. 이 내용은 설명하고자 하는 내용이 아니므로 자세한 설명을 덧붙이지는 않겠습니다.

 

소개할 내용은 요청을 적당히 모으고 트리거 조건 시 이를 한 번에 수행하는 코드입니다. 추가적으로 이 요청들은 각기 다른 요청이기에 수행 결과와 짝을 맺어 각 요청의 응답을 전달하는 것도 필요하였습니다.

 

linger 는 기본적으로 요청을 모으기 위한 buffer 와 모인 요청을 수행하기 위한 action 그리고 수행할 시점을 위한 트리거 조건으로 구성합니다.

  • buffer 는 여러 쓰레드에서 접근할 수 있으니 동시성 제어를 고려해야 합니다.
  • action 을 수행하기 위해서는 별도의 실행 쓰레드를 마련합니다.
  • 트리거 조건은 두 가지입니다. buffer 에 요청이 쌓이기 시작한지 특정 시간이 지난 후와 그 전에라도 특정 수의 요청이 누적되면 즉시 수행합니다.

 

기본 인터페이스

먼저 linger 를 사용하는 패턴을 고려합니다. 일단 크게 두 가지 측면을 고려해야 합니다. 매 실행을 linger 에 넣는 방법과 linger 가 트리거 될 때, 해당 대상 건들을 일괄 처리하는 방법입니다.

public class Linger<T, R> {

  private final Function<List<T>, List<R>> action;
  // ...

  public static <T, R> Linger<T, R> of(final Function<List<T>, List<R>> action, /* ... */) {
    return new Linger<>(action, /* ... */);
  }
  // ...

  public CompletableFuture<R> applyAsync(final T param) {
    // ...
  }
}

action 은 순서를 가진 수행 대상들을 받아 순서를 가진 결과를 반환하는 함수를 제공받아 사용합니다. #applyAsync(T) 메서드는 매번 실행을 요청하는 메서드로 CompletableFuture<R> 를 통해 결과를 전달할 것을 약속합니다.

 

Task Buffer

task 가 들어온 순서대로 처리 대상에 포함하기에는 queue 가 적합하고 또 여러 쓰레드에서 접근하게 되므로 임계 영역을 다뤄주는 ConcurrentLinkedQueue 를 task 를 모으는 buffer 로 사용합니다. buffer 는 request, response 의 쌍인 Task 를 담습니다.

public class Linger<T, R> {

  private final Queue<Task<T, R>> queue = new ConcurrentLinkedQueue<>();
  // ...

  @Value
  private static class Task<T, R> {
    T request;
    CompletableFuture<R> response = new CompletableFuture<>();
  }

  public CompletableFuture<R> applyAsync(final T request) {
    final var task = new Task<T, R>(request);
    put(task);
    return task.getResponse();
  }

  private void put(final Task<T, R> task) {
    synchronized (queue) {
      queue.add(task);
      // ...
    }
  }

  /**
   * queue 를 모두 비웁니다.
   *
   * @return queue 에 있던 모든 작업 목록
   */
  private List<Task<T, R>> poll() {
    synchronized (queue) {
      // 작업 처리량
      final var size = queue.size();

      // queue 에서 이번 수행에 포함할 항목을 추출
      final var tasks = new ArrayList<Task<T, R>>(size);
      for (int i = 0; i < size; i++) {
        tasks.add(queue.poll());
      }
      // ...
      return tasks;
    }
  }
  // ...
}

linger 의 queue 는 다음과 같은 기능이 필요합니다.

  • 하나의 추가 요청을 Task 로 등록합니다.
  • 지금 queue 에 등록된 모든 Task 를 꺼내어 queue 를 비웁니다.

queue 의 단일 기능 수행을 위해서라면 ConcurrentLinkedQueue 의 지원으로 충분하지만 queue 를 대상으로 하는 일련의 기능들이 원자적으로 다뤄지기 위해 synchronized 블록을 사용합니다. 이제 #applyAsync(T) 의 구현이 가능합니다. task 를 만들어 이를 queue 에 넣고 CompletableFuture 인 response 를 반환합니다.

 

수행

#poll() 에 의해 꺼낸 Task 들을 수행합니다. action 은 순서를 기반으로 request 와 그 result 를 연관지어 줄 것을 전제합니다. 그러면 action 을 수행한 결과를 받아 이를 순서에 맞춰 Task#response 에 전달합니다. 만약 오류가 있었다면 이번에 포함된 모든 작업에 이를 전달합니다.

  private void act(final List<Task<T, R>> tasks) {
    try {
      final var requests = new ArrayList<T>(tasks.size());
      for (Task<T, R> task : tasks) {
        requests.add(task.getRequest());
      }

      // 다건 수행
      final var results = action.apply(requests);

      for (int i = 0; i < results.size(); i++) {
        // 순서에 맞춰 response 에 결과를 반환
        tasks.get(i).getResponse().complete(results.get(i));
      }
    } catch (Throwable t) {
      for (Task<T, R> task : tasks) {
        // 다건 수행 실패 시 모든 response 에 에러 전파
        task.getResponse().completeExceptionally(t);
      }
    }
  }

 

트리거

linger 의 가장 핵심은 두 가지의 상보적인 트리거에 있습니다. 이에 앞서 수행 시 필요한 작업 쓰레드를 마련할 ExecutorService 도 필요합니다. 필요한 상황마다 성격에 맞는 Thread Pool 을 선택할 수 있을 것 같습니다.

@RequiredArgsConstructor
public class Linger<T, R> {

  private final Function<List<T>, List<R>> action;
  private final ExecutorService executor
  private final int count;
  private final long ms;
  // ...

count 와 ms 는 각각 count threshold 와 time threshold 를 위한 설정값입니다.

 

이제 먼저 상대적으로 간단한 count threshold 를 살펴보겠습니다.

  private void put(final Task<T, R> task) {
    synchronized (queue) {
      queue.add(task);
      // ...
      if (queue.size() >= count) {
        // count threshold 만족
        final var tasks = poll();
        executor.execute(() -> act(tasks));
      }
    }
  }

사실 count threshold 는 Task 를 추가하는 행위에 의해 발생할 수 있습니다. 그래서 Task 를 추가하는 행위와 같은 임계 영역에서 다뤄져야 합니다. 그렇기에 이 #put(Task<T,R>) 메서드는 단순히 작업을 추가하는 기능 외에 추가로 인한 threshold 들을 검사하는 기능을 포함합니다. 만약 현재 queue 가 count threshold 를 만족하는 Task 를 보유하게 되었다면 #poll() 로 모두 꺼내어 작업 쓰레드에 전달합니다. 이 때, #poll() 을 통해 작업을 꺼내는 동작까지는 이 임계 영역에 포함된 곳에서 수행합니다. 이것으로 count threshold 에 의한 트리거를 구현하였습니다.

 

이번엔 time threshold 를 살펴보겠습니다.

 

먼저 time threshold 는 다음과 같은 규칙을 갖습니다.

  • queue 가 비어있지 않은 시간을 잽니다.
  • queue 가 비어있는 동안에는 동작하지 않습니다.
  • timer 가 다 되면 작업 쓰레드에 현재 queue 를 비우고 꺼낸 작업을 작업 쓰레드에 전달합니다.

다음은 먼저 timer 를 시작하고 끄는 과정입니다. time threshold 에 도달했을 때 수행할 작업은 TimerTask 를 통해 등록하며 이 인스턴스는 어떤 시점에서든 없거나 하나만 있어야 합니다. 이는 역시 경쟁 조건에 놓인다는 것이고 volatile 접근자로 경쟁을 올바로 다룰 수 있도록 합니다. timerTask 가 null 인 상태를 timer 가 멈춘 상태로 간주합니다.

public class Linger<T, R> {

  private final Timer timer = new Timer();
  private volatile TimerTask timerTask;
  // ...

  public synchronized void setTimer() {
    if (timerTask == null) {
      timerTask = new TimerTask() {
        @Override
        public void run() {
          executor.execute(() -> act(poll()));
        }
      };
      timer.schedule(timerTask, ms);
    }
  }

  public synchronized void resetTimer() {
    if (timerTask != null && queue.isEmpty()) {
      timerTask.cancel();
      timerTask = null;
      timer.purge();
    }
  }

timer 에 관여하는 접근은 timertimerTask 이며 역시 경쟁 조건이 있습니다. 특히 timerTask 는 인스턴스가 바뀌기 때문에 임계 영역은 Linger 의 인스턴스로 확장되어야 합니다. 따라서, 이를 다루는 메서드는 synchronized 로 선언합니다. #setTimer() 는 이미 동작 중인 timer 를 확인하여 없을 때에만 새로 timer 를 시작합니다.

 

이제 timer 가 동작할 곳을 찾아야 하는데 조건은 queue 에 있습니다.

  • queue 에 Task 가 생기는 시점부터 timer 를 시작합니다.
  • queue 가 비워진다면 timer 를 끕니다.

각각 #put(T)#poll() 시점에 넣어주면 됩니다.

  private void put(final Task<T, R> task) {
    synchronized (queue) {
      queue.add(task);
      // 이미 등록한 schedule 이 없다면 새 schedule 을 설정할 것입니다.
      setTimer();
      // ... count threshold
    }
  }

  private List<Task<T, R>> poll() {
    synchronized (queue) {
      // ... 작업 꺼내기
      // queue 를 비우면서 schedule 된 내용도 초기화 합니다.
      resetTimer();

      return tasks;
    }
  }

이제 동작을 확인해보도록 하겠습니다.

 

기능 확인

이것은 사실 테스트는 아닙니다. 기능을 로그로 확인해보려고 합니다.

@Slf4j
public class LingerSpec {

  @Test
  public void linger() throws InterruptedException {
    final var count = 10; // count threshold
    final var ms = 2000; // time threshold
    final var linger = Linger.of(this::batch, Executors.newFixedThreadPool(5), count, ms);

    runAsync(() -> log.info("A > {}", linger.applyAsync("A").join()));
    runAsync(() -> log.info("B > {}", linger.applyAsync("B").join()));

    sleep(2100);
    runAsync(() -> log.info("C > {}", linger.applyAsync("C").join()));
    runAsync(() -> log.info("D > {}", linger.applyAsync("D").join()));
    runAsync(() -> log.info("E > {}", linger.applyAsync("E").join()));

    sleep(1000);
    log.info("F > {}", linger.applyAsync("F").join());
  }

  private List<String> batch(List<String> params) {
    try {
      log.info(params.toString());
      sleep(4000);
    } catch (InterruptedException ignored) {}

    return params;
  }
}

A, B 를 요청한 후 2초가 조금 더 지나고서 C, D, E, F 를 요청합니다. (F 요청과 그 앞의 sleep 은 테스트의 원활한 수행을 위해 join 을 잡아두려는 트릭입니다.)

 

다음은 count threshold: 10, time threshold: 2000 의 결과 입니다.

15:45:16.746 [pool-1-thread-1] - [B, A]
15:45:18.844 [pool-1-thread-2] - [C, D, E, F]

15:45:20.755 [commonPool-worker-5] - B > B
15:45:20.755 [commonPool-worker-19] - A > A
15:45:22.848 [commonPool-worker-13] - E > E
15:45:22.848 [commonPool-worker-27] - C > C
15:45:22.848 [main] - F > F
15:45:22.848 [commonPool-worker-9] - D > D

처음 A, B 를 요청한 후 2초가 지나도록 다음 요청이 없자 time threshold 에 의해 동작을 수행하였습니다. 그후 C, D, E, F 를 요청한 것도 count threshold 10 에 미치 못하여 2초 후 time threshold 에 의해 동작을 수행합니다. 여기서 F 는 1초 뒤에 도착한 요청이지만 C 를 요청한 순간부터 timer 가 동작하였음을 두 번째 수행의 시각을 통해 알 수 있습니다. 결과 역시 각자의 thread 에 잘 도착한 것도 확인할 수 있습니다.

 

다음은 count threshold: 3, time threshold: 10000 의 결과 입니다.

15:46:03.023 [pool-1-thread-1] - [B, A, C]
15:46:04.023 [pool-1-thread-2] - [E, D, F]

15:46:07.028 [commonPool-worker-5] - B > B
15:46:07.028 [commonPool-worker-19] - A > A
15:46:07.028 [commonPool-worker-27] - C > C
15:46:08.028 [commonPool-worker-23] - E > E
15:46:08.028 [main] - F > F
15:46:08.028 [commonPool-worker-9] - D > D

이번엔 A, B 가 도착하고서 2초가 좀 지난 후에 C, D, E, F 의 요청이 들어왔는데 C 요청이 들어오는 순간 count threshold 에 도달하여 수행이 이뤄집니다. 역시 F 요청이 들어오는 순간 count threshold 에 도달하고 동작을 수행합니다. 각각을 수행한 시각을 보면 time threshold 에 관여 없이 동작했음을 확인할 수 있습니다.

 

다음은 count threshold: 3, time threshold: 2000 의 결과 입니다.

15:46:27.807 [pool-1-thread-1] - [B, A]
15:46:27.898 [pool-1-thread-2] - [C, D, E]
15:46:30.905 [pool-1-thread-3] - [F]

15:46:31.813 [commonPool-worker-5] - B > B
15:46:31.813 [commonPool-worker-19] - A > A
15:46:31.900 [commonPool-worker-13] - E > E
15:46:31.900 [commonPool-worker-27] - C > C
15:46:31.900 [commonPool-worker-9] - D > D
15:46:34.905 [main] - F > F

처음 A, B 를 요청한 후에 2초가 지나도록 다음 요청이 오지 않아 time threshold 가 동작합니다. 그리고는 C, D, E 를 요청하면서 count threshold 에 도달하여 바로 동작을 수행합니다. 마지막으로 F 는 이후 요청이 없어 2초 후에 수행하는 동작에 포함되었습니다.

 

정리

앞서 언급한 대로 저는 DB 호출을 포함한 상황에서 호출에 따르는 오버헤드를 줄여보려는 미션이 있었습니다. 이에 Linger 를 적용하였습니다. 이는 기존 대비 5배의 TPS 를 견딜 수 있도록 해 주었습니다. 물론 그만큼 요청의 평균 응답 시간은 길어졌습니다. 모아서 처리한다는 것은 그만큼 지체된 처리가 된다는 점이니까요. 그래서 threshold 값을 너무 높게 설정하는 것 역시 부담이 되었습니다.

 

linger 는 모아서 처리하는 만큼 분명 요청들의 응답 시간을 늘립니다. 그런데 오버헤드를 낮춰 처리시간은 줄일 수 있습니다. 처리시간을 줄이는 것이 응답 시간에 바로 연관되지 않는다는 점은 재미난 특성입니다. 줄어든 처리시간으로 인해 늘어난 응답 시간의 효과를 상쇄할만한 수준에서 조정하는 것이 필요했습니다.

 

이번에는 비싼 비용을 다루는 방법으로 kafka 등에서 사용하는 linger 전략을 적용해본 사례를 소개해보았습니다. 이를 통해 경쟁 조건을 확인하고 이를 다루는 임계 영역을 설정하거나 각 목적의 쓰레드 사이에서 결과를 안전하게 전달하기 위해 비동기 도구를 사용해 보았습니다.

 

긴 글 읽어주셔서 감사합니다.

댓글