(25.06.19) 다중 서버에서의 Spring WebSocket
(25.05.30) ToyTalk WebSocket을 활용한 Chat 기능 구현 (Spring Boot)
• 키덜트 정보를 플랫폼 구애없이 대화방에서 나눌수 있는 특화된 채팅 플랫폼의 니즈• WebSocket 프레임 구현 서비스 구축기획목적">키덜트 사용자들이 공통 주제로 자유롭게 대" data-og-host="andr
andrew75313.tistory.com
이전의 ToyTalk 의 WebSocket 을 활용해서 채팅을 Backend 단에서 구현을 했다.
클라이언트-서버간 State 한 상태를 유지한채 양방향 통신을 하게 하는 것인데, 그렇다면 다중 서버에서는 어떻게 활용할 수 있을까 고민을 화던 중
Public Class Dev {} 프로젝트에서 다른 팀원이 구현을 했던 Redis를 활용한 채팅 구현이 생각이 나 이를 다시 정리하고자 했다.
당시, 해당 기능을 개발한 팀원분은 구체적으로 어떻게 작동하는 지 Sprint 회의때 알려주지 않았기 때문에, ( 급하게 추가한 기능이기 때문에 AI를 활용)
이번에 심도있게 스스로 확인해보면서 정리하고자 한다.
Redis 의 Pub/Sub 을 통한 메시지 동기화

- 위와 같이 그린 구조에서 Redis에 Pub/Sub을 활용한 중간 Brocker역할, 메시지 동기화 역할
- 메시지를 "chatrooms" 채널에 publish
- 구독(subscribe) 중인 모든 서버의 **RedisMessageListenerContainer**가 수신
- WebSocket 세션을 공유하지 않고도 메시지 동시에 전파 가능
- 클라이언트는 하나의 서버만 연결되어도 다른 서버로 메시지를 Publish , 전달 받을 수 있음
- teamId를 기준으로 구독 토픽을 라우팅도 가능
- STOMP 로 Server 로 전달 → 역직렬화 → Redis 로 전달하기 때문에, team ID 단위로 토픽이 구분되어도 해당 구독을 한 클라이언트에게만 전달이 가능
- 메시지 순서 및 일관성 유지 보장 (중요)
- Redis Pub/Sub는 FIFO 기반이므로 서버 간 메시지 순서가 일관됨 → WebSocket 채팅처럼 순서가 중요한 실시간 시스템에 적합하게 사용할 수 있음 일종의 메시지 큐
Redis를 중간 Broker로 한 FLOW
@Configuration
public class RedisConfig {
...
@Bean
public RedisConnectionFactory redisConnectionFactory(){
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setHostName(host);
redisStandaloneConfiguration.setPort(port);
redisStandaloneConfiguration.setPassword(password);
return new LettuceConnectionFactory(redisStandaloneConfiguration);
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter, ChannelTopic channelTopic) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, channelTopic);
return container;
}
@Bean
public ChannelTopic channelTopic() {
return new ChannelTopic("chatrooms");
}
@Bean
public MessageListenerAdapter listenerAdapter(RedisSubscriber redisSubscriber) {
return new MessageListenerAdapter(redisSubscriber, "onMessage");
}
}
- WAS(Spring Application 서버) 시작 시,→ Redis와의 TCP 커넥션이 열리고 유지됨
- **RedisConfig**에서 **LettuceConnectionFactory**로 Redis와 연결됨
@Service
@RequiredArgsConstructor
public class RedisSubscriber implements MessageListener {
private final ObjectMapper objectMapper;
private final SimpMessagingTemplate messagingTemplate;
private final ConcurrentMap<Long, MessagesResponseDto> messageCache = new ConcurrentHashMap<>();
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String messageBody = new String(message.getBody());
MessagesResponseDto messagesResponseDto = objectMapper.readValue(messageBody, MessagesResponseDto.class);
if (messagesResponseDto.getId() == null) {
return;
}
if (messageCache.putIfAbsent(messagesResponseDto.getId(), messagesResponseDto) != null) {
return;
}
messagingTemplate.convertAndSend("/topic/chatrooms/" + messagesResponseDto.getTeamsId(), messagesResponseDto);
} catch (JsonProcessingException e) {
throw new CustomException(ErrorCode.INVALID_REQUEST);
}
}
}
- **RedisMessageListenerContainer**가 Redis의 Pub/Sub 채널 **"chatrooms"**을 구독(Subscribe)
- → 서버는 Redis에서 publish되는 메시지를 **onMessage**로 수신할 준비를 마침
- 클라이언트가 **ws://{host}/ws**로 WebSocket 연결 요청
- → 이 과정에서 HTTP 1.1 → WebSocket 프로토콜 업그레이드 (Handshake) 발생
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketSecurityConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
private JwtAuthorizationFilter jwtAuthorizationFilter;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
}
...
}
- **WebSocketConfig**에서 **SimpleBroker**를 설정함
- /topic/**: 서버 브로커가 직접 클라이언트에게 STOMP 메시지를 broadcast 하는 주체, 주소
- /app/**: 클라이언트가 서버의 @MessageMapping 메서드로 매핑, 메시지 전송
- Client A와 Client D는 동일한 **teamId**에 대한 채팅방 토픽
- **/topic/chatrooms/{teamId}**를 구독 (STOMP SUBSCRIBE)
@RestController
@RequestMapping("/api/chatrooms")
@RequiredArgsConstructor
public class ChatRoomsController {
private final ChatRoomsService chatRoomsService;
@MessageMapping("/chat.sendMessage")
public void sendMessage(@Payload MessagesRequestDto messagesRequestDto) throws JsonProcessingException {
chatRoomsService.sendMessage(messagesRequestDto);
}
...
}
@Service
@RequiredArgsConstructor
public class ChatRoomsService {
...
@Transactional
public void sendMessage(MessagesRequestDto messagesRequestDto) throws JsonProcessingException {
Users user = usersRepository.findByName(messagesRequestDto.getSender())
.orElseThrow(() -> new CustomException(ErrorCode.USER_NOT_FOUND));
ChatRooms chatRoom = chatRoomsRepository.findById(messagesRequestDto.getTeamsId())
.orElseThrow(() -> new CustomException(ErrorCode.TEAM_NOT_FOUND));
Messages message = Messages.builder()
.contents(messagesRequestDto.getContent())
.users(user)
.chatRooms(chatRoom)
.build();
messagesRepository.save(message);
MessagesResponseDto savedMessageDto = MessagesResponseDto.builder()
.id(message.getId())
.content(message.getContents())
.sender(user.getName())
.teamsId(message.getChatRooms().getId())
.timestamp(message.getCreatedAt().toString())
.username(message.getUsers().getName())
.build();
String messageJson = objectMapper.writeValueAsString(savedMessageDto);
//여기서의 channelTopic은 위 RedisConfig 에서 지정한 것이므로 'chatrooms'가 됨
redisTemplate.convertAndSend(channelTopic.getTopic(), messageJson);
}
...
}
- Client A가 **/app/chat.sendMessage**로 메시지를 전송 (STOMP SEND)
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketSecurityConfig implements WebSocketMessageBrokerConfigurer {
...
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
String jwtToken = accessor.getFirstNativeHeader("Authorization");
if (jwtToken != null && jwtToken.startsWith("Bearer ")) {
jwtToken = jwtToken.substring(7);
Authentication authentication = jwtAuthorizationFilter.getAuthentication(jwtToken);
if (authentication != null) {
SecurityContextHolder.getContext().setAuthentication(authentication);
}
}
return message;
}
});
}
}
- 메시지는 ChannelInterceptor 를 통한 검증
- HTTP Header 내 JWT 토큰이 추출
- **jwtAuthorizationFilter로 사용자 인증 (SecurityContext**에 Authentication 저장 포함)
- SRP 에 의해 분리되어야 하나 빠른 작성을 위해 익명 클래스스로 바로 작성되었음
- 6번의 **ChatRoomsService.sendMessage()**가 호출됨
- 해당 메시지를 DB에 저장하고
- 메시지를 Redis의 "chatrooms" 채널에 publish
- Redis는 "chatrooms" 채널을 구독 중인 모든 Spring 서버 인스턴스의 RedisSubscriber에 메시지를 전달
- 각 서버의 **RedisSubscriber.onMessage()**는
- 해당 메시지를 역직렬화하고
- **SimpMessagingTemplate.convertAndSend("/topic/chatrooms/{teamId}", message)**로 STOMP 메시지를 서버에 연결된 구독자에게만 전송
- → 즉, Client D만 실제 이 메시지를 받게 됨 (Client A는 발신자이므로 수신 처리 없음)
생각보다 복잡한 절차를 거칠 수 있고, Redis의 역할을 RabbitMQ 또는 Kafka가 대신하는 만큼 어떤 미들웨어를 잡느냐에 따라서 구성하는 것이 달라질 수 있다.
하지만, Redis는 간단한게 Config를 통해서 Pub/Sub을 지정할 수 있는 점에서 멀티 서버에서 가장 간단하게 라투어, 중간 브로커 역할로 쓸 수 있는 가장 대표적인 예가 될 것이다.
단, 대용량 트래픽일 경우, 그리고 메시지 저장을 따로 Server 로직에서 구현을 해야한다는 점의 단점으로 가지고 있기 때문에,
관련 구성에 대한 이해를 두고, 다른 메시지 큐를 적용할 때도 사용할 수 있도록 한다.

'Develop Study > Redis' 카테고리의 다른 글
| (24.07.15)[14주차] 프로젝트 중 동시성 제어를 위한 Redisson 활용 (1) | 2024.07.15 |
|---|---|
| (24.07.04)[12주차] Redis 강의 학습 (0) | 2024.07.04 |