티스토리 뷰

Infra

Kafka 이벤트 모니터링이란.. (먼산)

지마켓 안세희 2022. 10. 19. 16:28

안녕하세요. Carbon 팀 안세희입니다.
이 글은 Kafka 이벤트 모니터링을 진행하며 주니어 개발자로서 겪었던 트러블슈팅에 대한 이야기입니다.
여름에 입사하여 또 다른 계절인 겨울이 다가오는 시점에서 지난 미션을 떠올리며 공유드리고자 합니다.
같은 고민을 하시는 개발자분이 계신다면 부족한 내용일지라도 도움이 되었으면 좋겠습니다.

 

어떤 걸 모니터링해야 했을까?

해당 미션은 결제가 실패한 경우 발행되는 이벤트를 스트리밍 하여 10분 동안 3번 이상 주문에 실패한 구매자를 찾는 것이었습니다.
당시에 실제 운영하는 환경에서 Kafka를 사용해본 경험이 없어 Kafka에 집계와 모니터링을 얹은 보이지 않은 산을 마주한 기분이었습니다.

하지만 모르는 만큼 다양한 방법을 도입할 수 있다는 개발자의 패기로 아래와 같이 요구 사항을 리스트업하고 그에 맞는 기술 스택을 선택해나가며 문제를 해결하였습니다.

 

기술 스택을 선택하기까지

먼저 아래와 같은 요구 사항을 리스트업하고 이에 맞는 기술 스택을 선택하게 되었는데요.

  1. 결제 실패 이벤트를 스트리밍하여 10분 동안 3번 이상 주문에 실패한 구매자를 찾아야 한다. → KSQL
  2. 구매자를 찾았으면 내가 원하는 DB에 데이터를 저장하여야 한다. → Kafka Connect
  3. DB에 적재된 주문에 실패한 구매자를 시계열 데이터로 가시화하여야 한다. → Elasticsearch & Grafana

미션을 진행하며 직면한 문제를 풀 수 있는 다른 방안이 있었음에도 위와 같이 선택한 이유는 다음과 같습니다.

 

KSQL

  1. 추상화 수준이 높은 스트림즈 방식을 낮은 러닝 커브로 도입할 수 있다.
  2. 원천 Kafka 메시지가 파이프라인에 도착하는 즉시 이상 패턴을 감지할 수 있다.
  3. 토픽 간 조인하거나, 시간 값을 기준으로 나누어(Time Windowing) 상태 정보를 지속적으로 집계 및 조회할 수 있다.

Kafka Connect

  1. 반복적인 Kafka 기반 데이터 파이프라인을 효율적으로 개발 및 배포할 수 있다.
  2. Kafka의 토픽을 Input 으로 원하는 DB에 쉽게 연결 및 싱크(Sink)가 가능하다.
  3. 2개 이상으로 반복적으로 구성되어야 하는 파이프라인이다.

Elasticsearch & Grafana

  1. 여러 계층 구조로 데이터를 저장할 수 있으며, 한 번의 쿼리로 쉽게 조회가 가능하다.
  2. 내용 전체를 색인하여 특정 단어가 포함된 문서를 검색할 수 있다.
  3. 시계열 대시보드로 데이터를 시각화할 수 있다.
  4. Alert가 발생하면 Slack에 Alert를 전달할 수 있다.

 

간략한 모니터링 과정

아래와 같은 KSQL 쿼리로 10분 동안 3번 이상 반복해서 주문에 실패하는 구매자를 찾아내어 별도의 Kafka 이벤트를 발행하고자 하였습니다.
먼저 주문 실패 이벤트 토픽(order_failed)으로부터 주문 실패한 구매자 스트림을 만듭니다.

CREATE STREAM BUYERS_WITH_ORDER_FAILED (
  BUYER_NO STRING,
  ORDER_KEY STRING
)
WITH (KAFKA_TOPIC='order_failed', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');

위 스트림으로부터 10분 동안 3번 이상 주문 실패한 구매자를 찾아내는 ORDER_FAILED_3_TIMES_IN_10_MINUTES 테이블을 만들고, 데이터가 집계될 때마다 테이블과 동명의 Kafka 토픽 이벤트를 발행합니다.

CREATE TABLE ORDER_FAILED_3_TIMES_IN_10_MINUTES 
    WITH (KAFKA_TOPIC='ORDER_FAILED_3_TIMES_IN_10_MINUTES') AS
SELECT 
    BUYER_NO,
    LATEST_BY_OFFSET(ORDER_KEY) ORDER_KEY,
    COUNT(*) ORDER_FAILED_COUNT
FROM BUYERS_WITH_ORDER_FAILED
WINDOW TUMBLING (SIZE 10 MINUTES)
GROUP BY BUYER_NO
HAVING (COUNT(*) >= 3)
EMIT FINAL;

쿼리 내 EMIT FINAL로 10분 window 집계의 중간 집계 결과가 아닌 최종 결과만 이벤트로 발행하고자 하였는데요. 이는 메모리 버퍼를 사용하여 window 내 도착하는 모든 데이터를 집계하기에 쿼리가 지연되는 경우 Out Of Memory가 발생할 여지가 많았습니다.
따라서 EMIT CHANGE로 바꾸어 10분이 지나지 않았더라도 3번 이상 주문에 실패하였다는 조건에 충족하면 중간 집계 결과를 이벤트로 발행하도록 변경하였습니다.
이렇게 발행된 이벤트는 Kafka Connect를 통해 Elasticsearch에 싱크 되며, 결과적으로 Elasticsearch 내 주문의 Unique 한 키(ORDER_KEY)가 ID인 Document로 저장됩니다.

 

하지만 개발자에게 난관은 항상 존재하는 법이죠…!

 

트러블 슈팅

주문을 3번 이상 실패하였지만, 결과적으로 주문에 성공하게 된 구매자는 모니터링 대상에서 제외해야 하는 필요성이 생겼습니다.

특정 시점의 이슈로 문제가 생겼던 주문이었기에 일반적으로는 성공하는 주문이라 모니터링 대상에서 제외해야 했기 때문이었습니다.

처음에는 주문이 성공한 경우에 별도의 이벤트로 발행하고 있었기에 이를 스트리밍 하여 집계 쿼리에 반영하면 될 것이라 단순히 생각했습니다.

SELECT
    OFS.BUYER_NO,
    OFS.ORDER_KEY ORDER_KEY,
    OP.EVENT_KEY EVENT_KEY
FROM ORDER_FAILED_3_TIMES_IN_10_MINUTES_STREAM OFS
LEFT OUTER JOIN ORDER_PLACED OP WITHIN 10 MINUTES ON (OFS.ORDER_KEY = OP.ORDER_KEY)
EMIT CHANGES; -- 10분 이후에 성공된 주문은 탐색 불가

그러나 주문서를 열어두고 3번 이상 주문에 실패한 뒤 다음 날 주문을 재시도하여 성공할 수도 있기에 주문 성공 이벤트가 늦게 올 수 있어 KSQL로 Windowing 하여 무한정 기다릴 수는 없었습니다.

다음으로 Grafana에서 3번 이상 실패한 주문과 성공한 주문을 Transform 기능을 사용하여 Join 하고 주문에 성공한 데이터는 필터링하여 제거하려고 시도하였습니다.

해당 방법을 사용하려면 Join 하려는 모든 대상 데이터가 Grafana에서 조회되어야 하는데 Elasticsearch 데이터는 최대 500건만 조회가 가능한 이슈가 있었습니다.
물론 Grafana 내 기간을 짧게 설정하고 조회하면 데이터 수가 줄어 확인이 가능하겠지만, TPS가 높은 주문 서비스에서는 여전히 사용할 수 없는 방식이었습니다.

 

따라서 성공하지 않은 주문 데이터 그 자체가 필요했기에, 주문 완료 이벤트를 수신받으면 Kafka Connector가 Elasticsearch 내 해당 주문과 관련된 Document(ORDER_KEY가 ID인 Document)에 완료 상태를 Upsert 하는 방식을 채택하였습니다.

//3번 이상 주문에 실패한 0번 고객
{
    "_index": "bz_carbon_order_failed_3_time_2022.10",
    "_type": "_doc",
    "_id": "test-0",
    "_score": 1,
    "_source": {
      "@timestamp": "2022-10-12T08:20:38Z",
      "ORDER_FAILED_COUNT": 3,
      "ORDER_KEY": "test-0",
      "BUYER_NO": "0",
      "eventType": "OrderFailedEvent" // 주문 실패
    }
  }
//3번 이상 주문에 실패했지만 결과적으로 주문에 성공한 1번 고객
{
    "_index": "bz_carbon_order_failed_3_time_2022.10",
    "_type": "_doc",
    "_id": "test-1",
    "_score": 1,
    "_source": {
      "@timestamp": "2022-10-12T08:21:18Z",
      "ORDER_FAILED_COUNT": 4,
      "ORDER_KEY": "test-1",
      "BUYER_NO": "1",
      "eventType": "OrderPlacedEvent" // 주문 성공
    }
  }

위 방식을 통해 주문에 성공한 구매자는 제외된 주문 실패건만 필터링하여 모니터링이 가능한 시계열 대시보드를 아래와 같이 구성할 수 있었습니다.

 

글을 마무리하며

지금까지 제가 진행했던 Kafka 이벤트 모니터링 적용 방법에 대해 살펴보았는데요.

이렇게 미션을 진행하면서 가장 크게 들었던 고민은 기술적인 이슈가 아니라 "기존 카프카 이벤트 모니터링 방식이 있는데 잘못된 방식으로 도입하는 건 아닐까?"에 대한 고민이었습니다.  하지만 지금의 생각은 다른 비즈니스 업무에 비해 크리티컬 하지 않은 업무에 선 도입하여 사전에 해당 기술을 검증할 좋은 기회였다고 생각합니다.
또한 이번 미션을 진행하며 기술을 선택하는 데 있어 서비스의 규모나 데이터의 특성을 고려하며 결정해야 한다는 나름의 결론과 지식을 얻을 수 있었습니다.

 

이 글을 쓰는 시점에도 전반적인 기술에 대한 지식이 부족하여 미처 생각하지 못한 더 나은 해결 방법이 있을 거라 생각합니다.
혹시 다른 생각이 있으시면 댓글을 통해 서로 의견을 공유할 수 있으면 좋을 것 같습니다.

긴 글 읽어주셔서 감사합니다.

댓글