봉황대 in CS

[Server] Redis Pub/Sub을 통해 Local cache 동기화하기 본문

Server

[Server] Redis Pub/Sub을 통해 Local cache 동기화하기

등 긁는 봉황대 2025. 2. 3. 23:42

Global cache vs. Local cache


분산 환경에서 Global cache는 다수의 서버가 바라볼 수 있는 캐시를 말한다. 예를 들어, MSA 환경에서 Redis를 캐시 서버로 띄워놓고, 여러 서버들이 DB에 있는 원본 데이터를 조회하는 것이 아니라 Redis에 캐싱되어 있는 데이터를 조회하는 것이다. 이를 통해 DB connection 요청을 줄일 수 있으며 Redis의 빠른 조회 성능을 통해 전체 시스템의 성능도 향상할 수 있다. 하지만 이는 원래 DB로 가야 했던 요청이 캐시 서버로 옮겨진 것일 뿐이라, 만약 조회가 매우 빈번하게 발생하는 데이터인 경우에는 캐시 서버가 그 부하를 모두 견뎌야 한다. 그리고 캐시 서버에서 데이터를 가져올 때마다 서버에서 메모리 할당이 발생하고 이후 GC가 발생하는 등 여러 문제가 존재한다.

 

이를 해결하기 위해서 Local cache를 도입할 수 있다. Local cache는 Global cache와 반대로, 서버마다 자신의 메모리 등에 따로 저장하는 캐시를 말한다. 즉, 서버 자신만이 이 캐시에 접근할 수 있는 것이다. Cache hit가 발생하는 경우, 서버 내에 존재하는 데이터를 사용하면 되는 것이라 네트워크를 타지 않아도 되며 데이터의 위치가 물리적으로 가깝기 때문에 조회 속도가 매우 빠르다. 메모리 할당 및 GC가 빈번하게 발생하는 문제 또한 발생하지 않는다.

 

하지만 Local cache를 도입하게 된다면, 캐시를 갱신해야 하는 경우에 이를 서버에게 어떻게 알려서 데이터를 갱신하도록 할지가 복잡해진다. 데이터가 여러 서버들에 분산되어 저장되기 때문에 캐시 동기화 정책이 복잡해지는 것이다. 이 글에서는 해당 문제를 Redis Pub/Sub을 통해 해결하고자 한다.

 

Overall Architecture


서버가 3개 떠있고, Load balancer가 요청을 받으면 알맞은 서버로 라우팅 해준다고 하자. 이때 Redis는 DB와 서버들 사이 중간 다리 역할을 하는 캐시 서버이자 데이터 갱신 여부를 알려주는 역할을 맡는다. 로컬 캐시를 사용할 것인데 왜 Redis에게 캐시 서버 역할 또한 맡기느냐?라고 묻는다면, 만약 로컬 캐시의 데이터가 invalidate 된 경우에 바로 DB에 질의하여 데이터를 가져오는 것이 아니라 Redis에 먼저 질의함으로써 DB 요청을 최대한 줄이고자 하였다. 이렇게 한다면 DB 요청은 Redis에 있는 데이터를 갱신하는 경우에 단 한 번만 발생할 것이다. DB가 죽으면 거의 모든 시스템이 마비되기 때문에 DB에 대한 요청은 줄일수록 좋다.

 

 

정리하자면, 캐싱하고자 하는 데이터 α가 캐싱되는 위치는 다음과 같다. ⭕️ 표시는 캐시가 유효한 경우, ❌ 표시는 캐시가 유효하지 않은 경우(== 갱신이 필요한 경우)라고 하자.

 

 

진행 흐름

한 서버에게 도달한 요청에 의해 데이터 α에 갱신이 발생했다고 하자. 그 순간, Redis와 각 서버의 Local cache에 올라가 있는 데이터들은 모두 유효하지 않게 된다. 전부 최신 데이터로 갱신이 필요하다는 것이다.

 

 

먼저 Redis에 올라가 있는 데이터를 갱신한다.

 

 

각 서버들은 자신이 가지고 있는 데이터 α에 대한 정보들이 유효하지 않은 정보인 것을 모르기 때문에 외부에서 누군가가 그 정보를 알려주어야 하는데, 이 역할을 Redis Pub/Sub이 진행하도록 할 것이다.

 

 

잠깐 Redis CLI를 통해서 Redis Pub/Sub이 어떻게 진행되는 놈인지 간단히 알아보자. 우선 Subscriber는 채널을 구독하여 계속 listen 하게 된다.

SUBSCRIBE <channel>

 

Publisher는 채널에 어떤 메시지를 보내달라고 명령할 수 있다.

PUBLISH <channel> <message>

 

메시지를 Publish 하는 순간, Subscriber가 보낸 메시지를 바로 받은 것을 아래 예시에서 확인할 수 있다.

 

 

이 메커니즘을 통해 각 서버들한테 '너가 가지고 있는 데이터 α에 대한 캐시는 invalidate 하셈'라고 메시지를 보낼 것이다.

 

 

다시 본론으로 돌아와서 .. 먼저 데이터 α를 Local cache로 캐싱하는 서버들은 채널 α'를 구독하도록 한다. 그리고 데이터 갱신이 발생한 경우, 데이터 갱신을 진행한 서버에서 Redis에게 채널 α'에 캐시를 Invalidate 하라는 메시지를 Publish 하라고 명령하면 된다.

 

 

그러면 메시지를 받은 서버들, 즉 채널 α'에 대한 Subscriber들은 유효하지 않은 데이터를 삭제할 수 있고,

 

 

이후 데이터 α에 대한 조회 요청이 와서 Cache miss가 발생한 경우에 Redis에서 데이터를 가져와 캐싱하면 된다. Subscriber들이 메시지를 받은 순간에 데이터를 가져와도 무방하지만, 만약 변경이 자주 발생하는 경우에는 리소스 낭비로 이어질 수 있으므로 lazy 하게 데이터를 가져오도록 하였다. (but. 이는 데이터의 특성에 따라 설정하는 것이 좋다)

 

 

발생할 수 있는 문제는 더 없는가 ?

Redis Pub/Sub은 Subscriber의 메시지 수신 유무를 상관하지 않는다. 그냥 채널에 메시지를 보내고 끝이기 때문에 Subscriber가 메시지를 받지 못한 경우를 신경 쓰지 않는다는 것으로, 서버가 메시지를 받지 못해 Local cache를 갱신하지 못하는 경우도 충분히 발생할 수 있다. 이는 Local cache의 TTL을 적절히 설정하거나, Spring Scheduler(@Scheduled)를 통해 n분마다 한 번씩 Local cache를 갱신하도록 설정하면 해결할 수 있다.

 

 

구현 by. Caffeine


참고로 코드에서 CacheManager는 Redis 캐시 서버이다. Global cache를 다른 것으로 변경할 가능성이 있기 때문에 이 부분은 추상화하였는데 코드 이해에는 문제가 없을 것이다.

@RequiredArgsConstructor
@Component
public class LocalCacheManager {

    private static final String INVALIDATE_CACHE_CHANNEL = "invalidate-cache";
    public static final String EVENTS_CACHE_KEY = "events";

    private final CacheManager cacheManager;
    private final MessagePublisher messagePublisher;
    private final MessageSubscriber messageSubscriber;

    private static Cache<String, List<Event>> eventsLocalCache;

    @PostConstruct
    public void init() {
        cacheManager.init();
        setEventsLocalCache();
        setSubscriberMessageListener();
    }

    private static void setEventsLocalCache() {
        eventsLocalCache = Caffeine
                .newBuilder()
                .recordStats()
                .expireAfterWrite(Duration.ofMinutes(10))
                .maximumSize(1)
                .build();
    }

    private void setSubscriberMessageListener() {
        messageSubscriber.addMessageListener(
                INVALIDATE_CACHE_CHANNEL,
                (message, pattern) -> invalidateCache(new String(message.getBody())));
    }

    private void invalidateCache(String message) {
        if (message.equals(EVENTS_CACHE_KEY)) {
            eventsLocalCache.invalidate(message);
        }
    }

    public void publishInvalidateCacheMessage(String cacheName) {
        messagePublisher.publish(INVALIDATE_CACHE_CHANNEL, cacheName);
    }

    public List<Event> getEventsLocalCache() {
        return getLocalCache(eventsLocalCache, EVENTS_CACHE_KEY, new TypeReference<>() {
        });
    }

    private <T> T getLocalCache(
            Cache<String, T> localCache,
            String key,
            TypeReference<T> typeReference) {
        T cachedValue = localCache.getIfPresent(key);
        if (cachedValue != null) {
            return cachedValue;
        }
        cachedValue = loadFromCache(key, typeReference);
        if (cachedValue == null) {
            return null;
        }
        localCache.put(key, cachedValue);
        return cachedValue;
    }

    private <T> T loadFromCache(String key, TypeReference<T> typeReference) {
        String cachedValue = cacheManager.get(key);
        if (cachedValue == null) {
            return null;
        }
        return readValue(cachedValue, typeReference);
    }
}

 

 

아래는 Caffeine을 통해 캐시를 생성하는 부분이다. Caffeine은 고성능 캐싱 라이브러리로, 여타 캐시들에 비해 월등한 성능을 가지고 있어 채택하였다. 왜 다른 것들에 비해서 엄청난 성능을 가지는지는 더 공부해봐야 할 것 같다 ,, (Caffeine's design)

    private static Cache<String, List<Event>> eventsLocalCache;

    private static void setEventsLocalCache() {
        eventsLocalCache = Caffeine
                .newBuilder()
                .recordStats()
                .expireAfterWrite(Duration.ofMinutes(10))
                .maximumSize(1)
                .build();
    }

 

 

코드에서 가장 헷갈리면서 중요한 부분은 setSubscriberMessageListener 메서드이다. MessageSubscriber가 Cache invalidate를 위한 채널을 구독하고, 여기에서 메시지를 받으면 invalidateCache 메서드를 수행하도록 설정한 것이다.

    private void setSubscriberMessageListener() {
        messageSubscriber.addMessageListener(
                INVALIDATE_CACHE_CHANNEL,
                (message, pattern) -> invalidateCache(new String(message.getBody())));
    }

 

MessageSubscriber 구현체는 다음과 같다.

@RequiredArgsConstructor
@Component
public class RedisMessageSubscriberImpl implements MessageSubscriber {

    private final RedisMessageListenerContainer redisMessageListenerContainer;

    public void addMessageListener(String channel, MessageListener messageListener) {
        redisMessageListenerContainer.addMessageListener(
                messageListener,
                new ChannelTopic(channel));
    }
}

 

 

publishInvalidateCacheMessage 메서드를 호출하면, MessagePublisher가 채널에 메시지를 publish 한다.

    public void publishInvalidateCacheMessage(String cacheName) {
        messagePublisher.publish(INVALIDATE_CACHE_CHANNEL, cacheName);
    }

 

MessagePublisher 구현체는 다음과 같다.

@RequiredArgsConstructor
@Component
public class RedisMessagePublisherImpl implements MessagePublisher {

    private final RedisTemplate<String, String> redisTemplate;

    public void publish(String channel, String message) {
        redisTemplate.convertAndSend(channel, message);
    }
}

 

 

^_______^

 

 

반응형

'Server' 카테고리의 다른 글

[Server] Offset 기반 vs. Cursor 기반 Pagination (feat. nGrinder 부하 테스트)  (0) 2025.01.20
REST API  (0) 2021.11.14
API / HTTP Packet / HTTP Method  (0) 2021.11.07
Key & Table 간의 관계  (0) 2021.10.25
Proxy  (0) 2021.10.11
Comments