Spring/Spring Stomp

[Spring WebSockets] 2-3. STOMP: 부가 기능

noahkim_ 2025. 5. 4. 20:21

1. Order of Messages

  • 브로커에서 클라이언트로 보내는 메시지는 clientOutboundChannel을 통해 전달됩니다.
  • 이 채널은 내부적으로 ThreadPoolExecutor(멀티스레드 풀)로 운영되기 때문에, 메시지들이 여러 스레드에서 처리됩니다.
  • 그래서 클라이언트가 실제로 받는 메시지 순서가 서버에서 보낸 순서와 다를 수 있습니다.

 

설정) Outbound 순서 보장

더보기
@Configuration
@EnableWebSocketMessageBroker
public class PublishOrderWebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 기타 설정...
        registry.setPreservePublishOrder(true);
    }
}
  • 같은 클라이언트 세션 내에서 메시지가 하나씩 순서대로 전달됨

 

설정) Inbound 순서 보장

더보기
@Configuration
@EnableWebSocketMessageBroker
public class ReceiveOrderWebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.setPreserveReceiveOrder(true);
    }
}
  • 같은 클라이언트 세션 내에서 서버가 메시지를 받은 순서 그대로 처리함

 

2. Events

  • Spring WebSocket에서는 다양한 ApplicationContext 이벤트가 발생합니다.
  • 이 이벤트들은 Spring의 ApplicationListener 인터페이스를 구현해서 받을 수 있습니다.

 

이벤트

이벤트 발생 시점 설명
BrokerAvailabilityEvent 브로커 연결/끊김 발생 시
브로커 연결 상태 변화 알림
SessionConnectEvent 클라이언트가 CONNECT 요청 시 새 세션 시작 알림
SessionConnectedEvent 브로커가 CONNECTED 응답 시
세션 완전히 연결 완료
SessionSubscribeEvent SUBSCRIBE 요청 시 구독 시작 알림
SessionUnsubscribeEvent UNSUBSCRIBE 요청 시 구독 해제 알림
SessionDisconnectEvent 세션 종료 시
세션 종료 알림
- 중복 발생 가능 (클라이언트 요청 + 타임 아웃 등)
- ⚠️ 하나의 세션에 대해 여러번 인터셉트 될 수 있음
- ✅ idempotent하게 리스너 핸들러 작성 필수
  • 외부 STOMP Broker Relay를 써도 똑같이 이벤트가 발생함

 

3. Interception

  • STOMP 연결의 생명주기(CONNECT, DISCONNECT)는 Event로 알림 받을 수 있다.
  • 하지만 모든 클라이언트 메시지(SEND, SUBSCRIBE, UNSUBSCRIBE 등)를 감시하려면 Interceptor를 써야 한다.

 

ChannelInterceptor

  • 클라이언트가 보낸 모든 메시지 가로챌 수 있음

 

예제

더보기
public class MyChannelInterceptor implements ChannelInterceptor {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        // command 종류에 따라 처리 (CONNECT, SUBSCRIBE, SEND, DISCONNECT 등)
        return message;
    }
}
  • StompHeaderAccessor를 이용해 메시지의 STOMP 헤더를 읽을 수 있다.

 

ExecutorChannelInterceptor

  • ChannelInterceptor의 하위타입
  • 메시지가 실제로 핸들러(Consumer)에서 처리될 때도 가로챌 수 있음.

 

예제

더보기
public class MyExecutorChannelInterceptor implements ExecutorChannelInterceptor {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        System.out.println("[preSend] 메시지가 채널로 보내지기 직전: " + message);
        return message;
    }

    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        System.out.println("[afterSendCompletion] 채널로 메시지 보낸 후: " + message);
    }

    @Override
    public Runnable beforeHandle(Message<?> message, MessageChannel channel, Runnable task) {
        System.out.println("[beforeHandle] 핸들러가 메시지를 처리하기 직전: " + message);
        return task;
    }

    @Override
    public void afterMessageHandled(Message<?> message, MessageChannel channel, Runnable task, Exception ex) {
        System.out.println("[afterMessageHandled] 핸들러가 메시지를 처리한 후: " + message);
    }
}

 

설정

더보기
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new MyChannelInterceptor());
    }
}

 

4. WebSocket Scope

WebSocket 세션과 속성 맵

  • 각 WebSocket 세션에는 Map<String, Object> 형태의 속성 맵이 존재함.
  • 이 속성 맵은 클라이언트가 보낸 메시지의 헤더에 포함되어 전송됨.
  • HttpSessionHandshakeInterceptor를 추가할 경우, 속성 맵에 세션 정보를 추가함

 

예제) SimpMessageHeaderAccessor

더보기

컨트롤러

@Controller
public class MyController {

    @MessageMapping("/action")
    public void handle(SimpMessageHeaderAccessor headerAccessor) {
        Map<String, Object> attrs = headerAccessor.getSessionAttributes();
        // 세션에 저장된 속성들 사용 가능
    }
}

 

Interceptor

public class MyInterceptor implements ChannelInterceptor {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            accessor.getSessionAttributes().put("userId", "12345");
            accessor.getSessionAttributes().put("nickname", "test");
        }
        
        return message;
    }
}
  • 직접 추가 가능

 

WebSocket 범위

  • 스프링에서는 "websocket" 스코프를 사용해 WebSocket 세션마다 별도의 빈을 생성할 수 있다.
    • WebSocket 세션의 속성 맵에 저장한다.
  • 이 빈은 WebSocket 세션 동안만 살아있고, 세션이 끝나면 파괴된다.
    • 세션 종료 시, @PreDestroy 메서드를 호출해서 자원 정리를 한다.
  • 주로 컨트롤러나 채널 인터셉터에서 이 빈을 주입받아 사용한다.

 

예제

더보기
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Component
@Scope("websocket")  // WebSocket 스코프 적용
public class WebSocketSession {

    private String username;

    // 세션 시작 시 클라이언트 이름 저장
    public void setUsername(String username) {
        this.username = username;
    }

    // 세션 중 클라이언트의 이름을 가져옴
    public String getUsername() {
        return username;
    }
}
@Controller
public class WebSocketController {

    // 클라이언트에서 메시지 전송 시 WebSocketSession에 사용자 이름 저장
    @MessageMapping("/chat.join")
    public void joinChat(String username, SimpMessageHeaderAccessor headerAccessor) {
        // WebSocketSession 빈을 주입받아 사용자 정보 설정
        WebSocketSession session = headerAccessor.getSessionAttributes().get("webSocketSession");
        if (session != null) {
            session.setUsername(username);
        }
    }

    // 클라이언트로부터 받은 메시지를 모든 클라이언트에게 브로드캐스트
    @MessageMapping("/chat.send")
    @SendTo("/topic/public")
    public String sendMessage(String message) {
        return message;
    }
}

 

5. Performance

성능은 단일 요소로 결정되지 않음

  • 웹소켓 성능은 여러 요소에 의해 좌우됨
  • ✅ 메시지 크기, 메시지 갯수, 네트워크 속도, 외부시스템 대기 등
  • ➡️ 단순히 서버 스레드를 늘리거나 브로커를 스케일업한다 해서 해결되지 않음

 

스레드 풀 기반 메시지 처리

  • 메시지는 내부적으로 채널을 통해 전달되고, 이 채널들은 스레드 풀 기반 비동기 실행 위에 동작함
  • CPU-Bound 작업은 CPU 개수 근처가 적당함 (많이 늘려봤자 경쟁 상황만 발생하기 쉬움)

 

ThreadPoolExecutor

  • 각 채널에 등록할 수 있음
  • corePoolSize: 기본적으로 유지하는 스레드 수
  • maxPoolSize: 큐가 꽉 찬 이후 추가로 늘어날 수 있는 최대 스레드 수
  • queueCapacity: 바로 처리하지 못한 작업을 쌓아두는 큐 크기.

 

clientOutboundChannel

  • 클라이언트에게 메시지를 보내는 역할
  • 한 클라이언트에 대해 오직 하나의 스레드만 전송에 사용됨
  • 한 클라이언트에 대한 후속 메시지들은 모두 버퍼에 쌓임
  • 클라이언트가 느리면 전송 시간이 길어져 시스템 부담이 커짐
  • 이를 보완하기 위해 추가적인 설정을 제공함

 

설정) sendTimeLimit, sendBufferSizeLimit

더보기
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setSendTimeLimit(15 * 1000)
                    .setSendBufferSizeLimit(512 * 1024);
    }

}
  • sendTimeLimit: 메시지 하나를 보내는데 드는 최대 시간
  • sendBufferSizeLimit: 전송이 늦어져서 아직 못 보낸 메시지가 버퍼에 쌓일 때까지 허용할 최대 크기

 

설정) MessageSizeLimit

더보기
@Configuration
@EnableWebSocketMessageBroker
public class MessageSizeLimitWebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(128 * 1024);
    }

}
  • 메시지 당 사이즈 제한이 있음 (WAS 상에서 크기 제한을 둠)
  • 큰 메시지는 여러 조각으로 나눠 보내지며 서버는 이를 다시 모아서 재조립해야함

 

 

 

출처