티스토리 뷰

Infra

Redis Stream 적용기

지마켓 기술블로그 2024. 7. 11. 15:55

안녕하세요 Data Product 팀 박상우입니다.

 

이번에 제가 소개해드릴 내용은 팀 내 session Info data 적재 및 API 서비스 구축에 적용한 Redis Stream에 대한 이야기입니다.

 

저희 팀에서는 User의 행동 정보를 수집하는 프레임워크 중 하나인 montelena receiver를 통해 수집한 데이터 (view, event, impression 등)를 post Processor라는 데이터 파이프라인 application을 통해 적재, 가공해서 각종 지표 트래킹 및 분석에 활용할 수 있도록 제공하고 있습니다.

 

Post Processor Data Pipeline

 

그중 유니크한 active user를 식별하기 위해 session_id를 발급하고, 그 히스토리를 남겨 광고에 활용하고 있는데,

'Big Smile Day' (지마켓 최고의 연례행사인 빅스마일데이, 이하 BDS)

'User Targeting Content' (이하 UTC) Push 발송 등 유저의 유입이 급증해서 트래픽이 대폭 증가할 경우 데이터 처리가 지연되는 현상이 발생하게 되었습니다.

 

이 문제를 해결하기 위해서는 부하를 발생시키는 로직을 분리해서 별도로 처리하도록 하는 application의 개발이 요구되는 상황이었고,

이에 Redis stream을 사용해서 session_id 히스토리 적재 로직을 수행하는 신규 consumer를 개발했던 과정을 간단하게나마 공유해보고자 합니다.

 

 

 

Redis Stream의 특징과 장점

 

2018년 10월 17일, Redis 5.0 버전이 출시되었습니다.

 

 
 
 

이전 버전에서 많은 부분이 개선되었지만 그중 가장 중요한 기능 추가 중 하나가 바로 Redis stream 이였는데요.
고가용성 데이터 스트리밍 처리가 도입되면서, 데이터의 일관성과 안정성을 보장하면서 대용량 데이터 스트림을 실시간으로 처리할 수 있게 되었습니다.
동시에 inmemory 기반으로 동작하는 key value 기반의 캐시를 사용하기 때문에 속도가 빠르다는 장점으로 사내에서도 저장소로 널리 사용되고 있죠.

 

 

왜 Redis Stream을 선택했는가?

 

처음에는 메시지 큐로 kafka나 MQ를 생각했었는데, 새로운 플랫폼을 적용해야 하다 보니 개발 공수도 늘어나고 리소스도 많이 소모될 거라는 결론을 내렸습니다.

거기다 BSD가 얼마 남지 않은 시점이라 그전에 개발을 완료해야 된다는 시간적인 제한도 허들이었습니다.

이미 session_id 저장소로 redis를 사용하고 있었고, 최대한 기존 로직을 건드리지 않으면서, 빠르게 히스토리를 적재할 수 있는 방법을 찾던 중,

kafka와 유사한 기능들을 제공하면서 사내 openshift 환경의 여러 개 pod에서 구동해도 데이터 중복이나 유실 없이 처리가 가능한 Redis stream을 선택하게 되었습니다.

 

 

 

Redis Pub/Sub과 Redis Stream?

 

일반적으로 Redis를 이용해 메시지를 Broadcasting 할 때는 pub/sub을 많이 사용합니다.

하지만 이 방식은 publisher가 메시지를 발행했을 때 subscriber가 존재하지 않거나 애플리케이션에 이슈가 발생하면 수신 여부에 관계없이 메시지가 휘발되는 단점이 있습니다.

또한, 여러 개의 subscriber를 구동하면 모두에게 동일한 메시지를 발행해 데이터가 중복되는 이슈가 발생합니다.

 

Redis Pub / Sub

 

반면, Redis Stream은 휘발성이 아니라 Kafka의 offset 개념처럼 마지막으로 수신한 record id를 저장하고 XADD, XREADGROUP, XACK, XPENDING, XCLAIM으로 이어지는 처리 프로세스를 통해 메시지를 컨트롤할 수 있는 다양한 방법을 제공합니다.

 

Redis Stream

출처 : https://jybaek.tistory.com/935

 

 

또한, Redis Stream은 consumer group을 지원하기 때문에 producer가 발행한 메시지를 여러 개의 consumer가 하나의 그룹을 형성해서 중복 없이 순차적으로 병렬 처리할 수 있습니다.

그리고 XACK 명령어를 사용해 메시지 처리 여부를 확인할 수 있으며, 일정 시간 동안 처리되지 못한 메시지들도 Pending Entries List를 이용해서 재처리할 수 있는 방법을 제공합니다.

 

 

 
Redis Stream

출처 : https://jybaek.tistory.com/935

 

 

Redis stream Development

 

이제 실제로 코드를 보며 Redis stream으로 어떻게 개발을 진행했는지 간단히 알아보도록 하겠습니다.

프로세스는 심플한 구조로 아래 flow chart를 참고해 주시면 되겠습니다.

 

 

 

 
Redis Stream flow

 

<Publisher>

먼저 Post Processor에서 트래픽 데이터를 가공/처리한 후, app에서 정의한 streamKey를 통해서 생성한 session 객체를 캡슐화해서 Redis Stream 메시지로 발행합니다.

ObjectRecord<String, String> record = StreamRecords.newRecord()
				.ofObject(GsonUtil.gson().toJson(session))
				.withStreamKey(streamKey);

RecordId recordId = gmktRedisTemplate.opsForStream().add(record);
if (Objects.isNull(recordId)) {
    // Redis Stream record 처리에 실패했을 경우 로직 추가
}
 

opsForStream().add() 메서드를 통해서 메시지를 발행하면 recordId를 리턴하는데, 이 값은 각각의 메시지가 스트림에 추가될 때 Redis가 생성해 주는 고유의 id 값으로 보시면 됩니다.

저는 stream key / value 형태의 ObjectRecord 객체를 사용해서 개발했지만,

이외에도 value 값으로 Map<K, V> 형태로 데이터를 저장하고 읽을 수 있는 MapRecord라는 메서드도 지원하고 있습니다.

public interface MapRecord<S, K, V> extends Record<S, Map<K, V>>, Iterable<Map.Entry<K, V>> {

	static <S, K, V> MapRecord<S, K, V> create(S stream, Map<K, V> map) {

		Assert.notNull(stream, "Stream must not be null");
		Assert.notNull(map, "Map must not be null");

		return new MapBackedRecord<>(stream, RecordId.autoGenerate(), map);
	}
}
 

 

<Consumer>

다음은 발행된 메시지를 컨슈밍 하고 처리하는 코드를 보여드리겠습니다.

먼저 Spring boot 프로젝트에서 'spring-boot-starter-data-redis' 의존성을 추가하고 Redis 서버 설정을 완료합니다.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
 

그다음 'StreamListener' 인터페이스를 구현하고, afterPropertiesSet() 메서드를 @Orverride 해서 Redis Stream 메시지를 소비하는 consumer Group과 Listener Container를 설정하고 초기화합니다.

@Override
public void afterPropertiesSet() throws Exception {
    // Consumer Group 설정
    createStreamConsumerGroup(streamKey, consumerGroupName);

    // StreamMessageListenerContainer 설정
    this.listenerContainer = StreamMessageListenerContainer.create(
				redisTemplate.getConnectionFactory(),
				StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
				.targetType(String.class)
				.pollTimeout(Duration.ofSeconds(2))
				.build()
		);

    // Subscription 설정
    this.subscription = this.listenerContainer.receive(
            Consumer.from(this.consumerGroupName, consumerName),
            StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
            this
    );

    // Redis listen 시작
    this.listenerContainer.start();
}

public void createStreamConsumerGroup(String streamKey, String consumerGroupName){
	if (!redisTemplate.hasKey(streamKey)){
		RedisClusterAsyncCommands commands = (RedisClusterAsyncCommands) this.redisTemplate
				.getConnectionFactory()
				.getClusterConnection()
				.getNativeConnection();
		
        CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8)
				.add(CommandKeyword.CREATE)
				.add(streamKey)
				.add(consumerGroupName)
				.add("0")
				.add("MKSTREAM");

		commands.dispatch(CommandType.XGROUP, new StatusOutput(StringCodec.UTF8), args);
	}
	else{
		if(!isStreamConsumerGroupExist(streamKey, consumerGroupName)){
			this.redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);
		}
	}
}
 

Publisher에서 설정한 StreamKey와 Consumer Name을 세팅해서 Consumer Group을 생성합니다.

그리고 Consumer 객체를 생성, Offset 설정, Subscriber 옵션 등 세부적인 설정을 완료한 뒤, Listener Container를 시작하면 Redis Stream 메시지를 비동기적으로 Listner에게 전달하게 됩니다.

이 과정을 통해서 여러 개의 Consumer Group을 사용해서 하나의 Redis Stream을 병렬로 처리할 수 있습니다.

 

이렇게 전달되는 메시지들은 마찬가지로 'StreamListener' 인터페이스가 제공하는 onMessage 메서드를 구현해서 처리하게 됩니다.

@Override
public void onMessage(ObjectRecord<String, String> message) {
    
    String stream = message.getStream();
    String recordId = message.getId().getValue();
    
    try {
        // 처리할 로직 구현
        if (StringUtils.isNotEmpty(message.getValue())) {
            // To-Do
        }
        // 이후, ack stream
        this.redisTemplate.opsForStream().acknowledge(streamKey, consumerGroupName, recordId);
        
    } catch (Exception e) {
        // TODO: handle exception
        e.printStackTrace();
        this.redisOperator.delete(stream, recordId);
    }
}
 

메시지를 처리하고 나면 'ackStream' 메소드를 호출해서 Redis Stream 메시지가 성공적으로 처리되었음을 알리고, 해당 메시지를 대기열에서 제거합니다.

Exception 이 발생할 경우 recordId를 통해 메시지를 삭제 처리할 수도 있고, 다양한 시나리오로 처리가 가능하니 용도에 맞게 사용하시면 되겠습니다.

 

 

 

Redis stream 적용 시 고려할 점들

 

Redis Stream은 파티션 개념이 없기 때문에 하나의 stream을 여러 개의 consumer가 병렬 처리하는 구조로 동작합니다.

그래서 Redis cluster 구조에서 sharding 되어 있는 node에 메시지를 고르게 보내려면 추가적으로 N개의 stream을 각각의 node에 할당하도록 추가 개발이 필요합니다.

이러한 특징 때문에 produce 된 순서대로 메시지를 처리한다는 보장이 없어지는데, 이는 실서비스에 적용할 때 반드시 고려해야 할 점입니다.

Partition을 지원하는 Kafka cousumer group의 구조
단일 Stream Redis stream cousumer group의 구조

출처 : https://mattwestcott.org/blog/redis-streams-vs-kafka

 

 

그리고 또 한 가지.

in-memory 기반의 저장소이기 때문에 memory 관리에 신경을 많이 써야 합니다.

예를 들어, XACK를 받지 못한 pending 메시지를 다시 처리하지 않으면, 그 메시지들은 점점 쌓여가면서 Redis cluster의 memory를 위태롭게 만들 것입니다.

그래서 저도 1분마다 스케줄을 돌며 pending 메시지를 처리하는 로직을 추가해 memory full 이슈를 방지하고 있습니다.

@Scheduled(fixedRate = 60000)
public void PendingMessagesSummaryScheduler() {
    PendingMessagesSummary pendingSummary = this.redisOperator.pendingSummary(streamKey, consumerGroupName);

    // 로그 출력
    pendingSummary.getPendingMessagesPerConsumer().forEach((consumer, count) -> {
        log.info("Consumer: " + consumer + ", Pending Messages: " + count);
    });

    // pendingSummary와 TotalPendingMessages 체크 및 메시지 처리
    if (pendingSummary != null && pendingSummary.getTotalPendingMessages() > 0) {
        PendingMessages pendingMessages = this.redisOperator.pending(streamKey, consumerGroupName);
        pendingMessages.toList().stream()
            .filter(pendingMessage -> !ObjectUtils.isEmpty(pendingMessage))
            .forEach(pendingMessage -> {
                List<ObjectRecord<String, String>> messages = this.redisOperator.read(streamKey, consumerGroupName, consumerName, pendingMessage.getIdAsString());
                messages.stream()
                    .filter(recordMessage -> !ObjectUtils.isEmpty(recordMessage))
                    .forEach(recordMessage -> {
                        // 메시지 처리 로직

                    });
                // 메시지 ACK
                this.redisTemplate.opsForStream().acknowledge(streamKey, consumerGroupName, pendingMessage.getIdAsString());
            });
    }
}
 

 

 

 

마치며

 

Redis Stream 적용 전
 
Redis Stream 적용 후

데이터 파이프라인 applicaion에 Redis Stream을 적용 후,

UTS Push 발송 시 지연되던 트래픽 처리가 완전하게 해소되어, BDS 행사기간에도 원활하게 서비스가 가능하게 되었습니다.

Redis stream은 분산 처리 환경에서 쉽고 빠르게 실시간 데이터 처리를 가능하게 만들어주는 아주 효율적인 도구입니다.

이 포스팅을 통해서 Redis Stream의 개념과 동작 방식에 대해서 간단하게나마 파악하고, 추후 Redis Stream을 이용한 애플리케이션을 구축하는데 작은 도움이 되었으면 하는 바람입니다.

 

 

 

 

 

 

<참고문서>

https://nimasrn.medium.com/introduction-to-redis-streams-1d6a95ab141

https://redis.io/docs/latest/develop/data-types/streams/

https://jybaek.tistory.com/935

https://kingjakeu.github.io/springboot/2022/02/10/spring-boot-redis-stream/

댓글