본문 바로가기
Develop Study/Redis 2025. 6. 19.

(25.06.19) 다중 서버에서의 Spring WebSocket

 

 

(25.05.30) ToyTalk WebSocket을 활용한 Chat 기능 구현 (Spring Boot)

• 키덜트 정보를 플랫폼 구애없이 대화방에서 나눌수 있는 특화된 채팅 플랫폼의 니즈• WebSocket 프레임 구현 서비스 구축기획목적">키덜트 사용자들이 공통 주제로 자유롭게 대" data-og-host="andr

andrew75313.tistory.com

 

이전의 ToyTalk 의 WebSocket 을 활용해서 채팅을 Backend  단에서 구현을 했다.

클라이언트-서버간 State 한 상태를 유지한채 양방향 통신을 하게 하는 것인데, 그렇다면 다중 서버에서는 어떻게 활용할 수 있을까 고민을 화던 중 

Public Class Dev {} 프로젝트에서 다른 팀원이 구현을 했던 Redis를 활용한 채팅 구현이 생각이 나 이를 다시 정리하고자 했다.

당시, 해당 기능을 개발한 팀원분은 구체적으로 어떻게 작동하는 지 Sprint 회의때 알려주지 않았기 때문에, ( 급하게 추가한 기능이기 때문에 AI를 활용)

이번에 심도있게 스스로 확인해보면서 정리하고자 한다.

 


Redis 의 Pub/Sub 을 통한 메시지 동기화

다중 서버에서의 Redis 의 중간 Broker 구조

 

  • 위와 같이 그린 구조에서 Redis에 Pub/Sub을 활용한 중간 Brocker역할, 메시지 동기화 역할
    • 메시지를 "chatrooms" 채널에 publish
    • 구독(subscribe) 중인 모든 서버의 **RedisMessageListenerContainer**가 수신
  • WebSocket 세션을 공유하지 않고도 메시지 동시에 전파 가능
    • 클라이언트는 하나의 서버만 연결되어도 다른 서버로 메시지를 Publish , 전달 받을 수 있음
  • teamId를 기준으로 구독 토픽을 라우팅도 가능
    • STOMP 로 Server 로 전달 → 역직렬화 → Redis 로 전달하기 때문에, team ID 단위로 토픽이 구분되어도 해당 구독을 한 클라이언트에게만 전달이 가능
  • 메시지 순서 및 일관성 유지 보장 (중요)
    • Redis Pub/Sub는 FIFO 기반이므로 서버 간 메시지 순서가 일관됨 → WebSocket 채팅처럼 순서가 중요한 실시간 시스템에 적합하게 사용할 수 있음 일종의 메시지 큐

Redis를 중간 Broker로 한 FLOW

@Configuration
public class RedisConfig {
    ...
    
    @Bean
    public RedisConnectionFactory redisConnectionFactory(){
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setHostName(host);
        redisStandaloneConfiguration.setPort(port);
        redisStandaloneConfiguration.setPassword(password);
        return new LettuceConnectionFactory(redisStandaloneConfiguration);
    }
    
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter, ChannelTopic channelTopic) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, channelTopic);
        return container;
    }
    
    @Bean
    public ChannelTopic channelTopic() {
        return new ChannelTopic("chatrooms");
    }
    
    @Bean
    public MessageListenerAdapter listenerAdapter(RedisSubscriber redisSubscriber) {
        return new MessageListenerAdapter(redisSubscriber, "onMessage");
    }
}
  1. WAS(Spring Application 서버) 시작 시,→ Redis와의 TCP 커넥션이 열리고 유지됨
  2. **RedisConfig**에서 **LettuceConnectionFactory**로 Redis와 연결됨
@Service
@RequiredArgsConstructor
public class RedisSubscriber implements MessageListener {
    
    private final ObjectMapper objectMapper;
    private final SimpMessagingTemplate messagingTemplate;
    private final ConcurrentMap<Long, MessagesResponseDto> messageCache = new ConcurrentHashMap<>();
    
    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            String messageBody = new String(message.getBody());
            MessagesResponseDto messagesResponseDto = objectMapper.readValue(messageBody, MessagesResponseDto.class);
            
            if (messagesResponseDto.getId() == null) {
                return;
            }
            if (messageCache.putIfAbsent(messagesResponseDto.getId(), messagesResponseDto) != null) {
                return;
            }
            
            messagingTemplate.convertAndSend("/topic/chatrooms/" + messagesResponseDto.getTeamsId(), messagesResponseDto);
        } catch (JsonProcessingException e) {
            throw new CustomException(ErrorCode.INVALID_REQUEST);
        }
    }
}
  1. **RedisMessageListenerContainer**가 Redis의 Pub/Sub 채널 **"chatrooms"**을 구독(Subscribe)
  2. → 서버는 Redis에서 publish되는 메시지를 **onMessage**로 수신할 준비를 마침
  3. 클라이언트가 **ws://{host}/ws**로 WebSocket 연결 요청
  4. → 이 과정에서 HTTP 1.1 → WebSocket 프로토콜 업그레이드 (Handshake) 발생
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketSecurityConfig implements WebSocketMessageBrokerConfigurer {
    
    @Autowired
    private JwtAuthorizationFilter jwtAuthorizationFilter;
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }
    
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
            .setAllowedOriginPatterns("*")
            .withSockJS();
    }
    
...
}
  1. **WebSocketConfig**에서 **SimpleBroker**를 설정함
    • /topic/**: 서버 브로커가 직접 클라이언트에게 STOMP 메시지를 broadcast 하는 주체, 주소
    • /app/**: 클라이언트가 서버의 @MessageMapping 메서드로 매핑, 메시지 전송
  2. Client A와 Client D는 동일한 **teamId**에 대한 채팅방 토픽
  3. **/topic/chatrooms/{teamId}**를 구독 (STOMP SUBSCRIBE)
@RestController
@RequestMapping("/api/chatrooms")
@RequiredArgsConstructor
public class ChatRoomsController {
    
    private final ChatRoomsService chatRoomsService;
    
    @MessageMapping("/chat.sendMessage")
    public void sendMessage(@Payload MessagesRequestDto messagesRequestDto) throws JsonProcessingException {
        chatRoomsService.sendMessage(messagesRequestDto);
    }
    
...
}
@Service
@RequiredArgsConstructor
public class ChatRoomsService {
    
...
    @Transactional
    public void sendMessage(MessagesRequestDto messagesRequestDto) throws JsonProcessingException {
        Users user = usersRepository.findByName(messagesRequestDto.getSender())
            .orElseThrow(() -> new CustomException(ErrorCode.USER_NOT_FOUND));
        ChatRooms chatRoom = chatRoomsRepository.findById(messagesRequestDto.getTeamsId())
            .orElseThrow(() -> new CustomException(ErrorCode.TEAM_NOT_FOUND));
        
        Messages message = Messages.builder()
            .contents(messagesRequestDto.getContent())
            .users(user)
            .chatRooms(chatRoom)
            .build();
        
        messagesRepository.save(message);
        
        MessagesResponseDto savedMessageDto = MessagesResponseDto.builder()
            .id(message.getId())
            .content(message.getContents())
            .sender(user.getName())
            .teamsId(message.getChatRooms().getId())
            .timestamp(message.getCreatedAt().toString())
            .username(message.getUsers().getName())
            .build();
        String messageJson = objectMapper.writeValueAsString(savedMessageDto);
        
        //여기서의 channelTopic은 위 RedisConfig 에서 지정한 것이므로 'chatrooms'가 됨
        redisTemplate.convertAndSend(channelTopic.getTopic(), messageJson);
    }
    
...
}
  1. Client A가 **/app/chat.sendMessage**로 메시지를 전송 (STOMP SEND)
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketSecurityConfig implements WebSocketMessageBrokerConfigurer {
    
...
    
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptor() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
                
                String jwtToken = accessor.getFirstNativeHeader("Authorization");
                if (jwtToken != null && jwtToken.startsWith("Bearer ")) {
                    jwtToken = jwtToken.substring(7);
                    Authentication authentication = jwtAuthorizationFilter.getAuthentication(jwtToken);
                    if (authentication != null) {
                        SecurityContextHolder.getContext().setAuthentication(authentication);
                    }
                }
                
                return message;
            }
        });
    }
}
  1. 메시지는 ChannelInterceptor 를 통한 검증
    • HTTP Header 내 JWT 토큰이 추출
    • **jwtAuthorizationFilter로 사용자 인증 (SecurityContext**에 Authentication 저장 포함)
    • SRP 에 의해 분리되어야 하나 빠른 작성을 위해 익명 클래스스로 바로 작성되었음
  2. 6번의 **ChatRoomsService.sendMessage()**가 호출됨
    • 해당 메시지를 DB에 저장하고
    • 메시지를 Redis의 "chatrooms" 채널에 publish
  3. Redis는 "chatrooms" 채널을 구독 중인 모든 Spring 서버 인스턴스의 RedisSubscriber에 메시지를 전달
  4. 각 서버의 **RedisSubscriber.onMessage()**는
    • 해당 메시지를 역직렬화하고
    • **SimpMessagingTemplate.convertAndSend("/topic/chatrooms/{teamId}", message)**로 STOMP 메시지를 서버에 연결된 구독자에게만 전송
    • → 즉, Client D만 실제 이 메시지를 받게 됨 (Client A는 발신자이므로 수신 처리 없음)

 


생각보다 복잡한 절차를 거칠 수 있고, Redis의 역할을 RabbitMQ 또는 Kafka가 대신하는 만큼 어떤 미들웨어를 잡느냐에 따라서 구성하는 것이 달라질 수 있다.

하지만, Redis는 간단한게 Config를 통해서 Pub/Sub을 지정할 수 있는 점에서 멀티 서버에서 가장 간단하게 라투어, 중간 브로커 역할로 쓸 수 있는 가장 대표적인 예가 될 것이다.

 

단, 대용량 트래픽일 경우, 그리고 메시지 저장을 따로 Server 로직에서 구현을 해야한다는 점의 단점으로 가지고 있기 때문에,

관련 구성에 대한 이해를 두고, 다른 메시지 큐를 적용할 때도 사용할 수 있도록 한다.