가상 상황설명

 

[1] api-server (주문, 송금 등 HTTP 처리)
     └─ KafkaProducer 전송 (TransferRequested)

[2] transfer-processor (Kafka Consumer, 송금 처리 전용)
     └─ KafkaProducer (TransferSucceeded)

[3] socket-gateway (WebSocket 전용)
     └─ KafkaConsumer (TransferSucceeded 수신)
     └─ 사용자에게 push

 

 

WebSocket push

1. Client가 WebSocket으로 Push서버에 먼저연결

2. Push서버는 연결된 세션을 Map처럼 관리(userId -> session)

3. Kafka에서 메시지를 수신하면, 해당 userId에 연결된 세션을 찾아서 메시지 전송

 

코드예시

WebSocket설정

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new MyWebSocketHandler(), "/ws")
                .setAllowedOrigins("*");
    }
}

 

WebSocketHandler

@Component
public class MyWebSocketHandler extends TextWebSocketHandler {

    // userId -> session 저장 (concurrent주의)
    private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        // 쿼리 파라미터에서 userId 추출 (예시)
        String userId = getUserIdFromSession(session);
        sessions.put(userId, session);
    }

    public void sendToUser(String userId, String message) throws IOException {
        WebSocketSession session = sessions.get(userId);
        if (session != null && session.isOpen()) {
            session.sendMessage(new TextMessage(message));
        }
    }

    private String getUserIdFromSession(WebSocketSession session) {
        URI uri = session.getUri(); // 예: ws://localhost:8080/ws?userId=wonil
        return UriComponentsBuilder.fromUri(uri).build().getQueryParams().getFirst("userId");
    }
}

 

Kafka Consumer → WebSocket 전송 연동

@Component
public class KafkaResultConsumer {

    private final MyWebSocketHandler webSocketHandler;

    public KafkaResultConsumer(MyWebSocketHandler handler) {
        this.webSocketHandler = handler;
    }

    @KafkaListener(topics = "TransferSucceeded")
    public void onTransferSuccess(String messageJson) throws IOException {
        JSONObject obj = new JSONObject(messageJson);
        String userId = obj.getString("userId");
        String msg = obj.getString("message");

        webSocketHandler.sendToUser(userId, msg);
    }
}

 

클라이언트 (JavaScript)

<script>
  const userId = "wonil";  // 로그인된 사용자 ID라고 가정
  const socket = new WebSocket("ws://localhost:8080/ws?userId=" + userId);

  socket.onmessage = function(event) {
    console.log("서버로부터 수신된 메시지:", event.data);
    alert(event.data);  // 예: "송금 완료!"
  };
</script>

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

반응형

'Programming > SpringBoot' 카테고리의 다른 글

webflux  (0) 2025.05.31
Spring에서 graphQL적용하기  (0) 2023.10.13
Spring에서 gRPC연동하기  (2) 2023.10.11
maven dependency  (0) 2023.10.11
Spring Boot에서 endpoint접근을 http에서 https로 변경하기  (0) 2023.10.09

+ Recent posts