반응형
Project Reactor의 Flux 예제에서는 데이터가 언제 흐르기 시작하는지, publishOn 이후 어느 스레드에서 실행되는지가 핵심입니다.
이 글은 Flux.just, map, publishOn, subscribe가 어떤 순서로 동작하고 boundedElastic 스케줄러가 어디부터 적용되는지 확인하는 메모입니다.
핵심 정리
Reactor 코드는 선언한 순간 바로 실행되는 것이 아니라 subscribe를 만났을 때 실제 흐름이 시작됩니다. publishOn을 사용하면 그 이후의 연산이 지정한 Scheduler에서 실행됩니다. 예제에서 첫 번째 map은 main 스레드에서 실행되고, publishOn 이후의 map과 subscribe 소비자는 boundedElastic 스레드에서 실행됩니다. 에러가 발생하는 예제에서는 어느 단계에서 예외가 생기고 이후 연산이 어떻게 중단되는지도 함께 확인할 수 있습니다.
- Flux.just는 여러 값을 순서대로 흘려보낼 Publisher를 만듭니다.
- map은 각 아이템을 변환하지만 subscribe 전까지 실제 실행되지 않습니다.
- subscribe는 지연된 스트림 실행을 시작하는 지점입니다.
- publishOn은 그 이후 연산이 실행될 Scheduler를 바꿉니다.
- boundedElastic은 블로킹 작업이나 별도 스레드가 필요한 작업에 자주 쓰입니다.
- 중간 map에서 예외가 나면 이후 연산과 소비 흐름도 영향을 받습니다.
원문은 코드 주석이 좋지만 제목이 reactor #1이라 검색 의도가 약했습니다. 이번 보강은 publishOn과 subscribe 실행 시점을 먼저 잡아 예제 출력의 스레드 이름을 이해하기 쉽게 만들었습니다.
이어서 볼 글
- Reactor Context - Reactor chain 안에서 메타데이터를 전파하는 다음 개념 글이다.
- Spring WebFlux 기본 개념과 Mono, Flux 이해 - Reactor의 Mono와 Flux가 Spring WebFlux에서 어떻게 쓰이는지 이어서 볼 수 있다.
- Java Future 사용법: 비동기 작업 결과 받기 - Reactor와 비교할 Java 기본 비동기 결과 처리 방식이다.
다음 코드를 보자(주석을 주의깊게 볼 것)
package org.example;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Main {
public static void main(String[] args) {
Flux.just("a", "b", "c", "d") // 1,2,3,4 네개의 아이템을 순차적으로 방출하는 단일 스트림 생성
.map(s -> { // 스레드 분기 없이 호출 스레드에서 해당 스트림 아이템에 대해서 대문자변환 연산수행(아래 .subscribe를 만나기 전까지 지연됨!)
System.out.println("Map 1 - Thread: " + Thread.currentThread().getName());
return s.toUpperCase();
})
.publishOn(Schedulers.boundedElastic()) //이후 작업은 boundedElastic이라는 사전정의된 별도 스레드에서 하도록 지정함!
.map(s -> {
System.out.println("Map 2 - Thread: " + Thread.currentThread().getName());
return s + "!";
})
.subscribe( // 지연 실행을 시작하는 시점이며, 마지막 스케줄러 지정이 boundedElastic이라 boundedElastic 스레드를 통해 수행됨
s -> System.out.println("Received " + s + " on Thread: " + Thread.currentThread().getName()));
}
}
실행결과
> Task :Main.main()
Map 1 - Thread: main
Map 1 - Thread: main
Map 1 - Thread: main
Map 1 - Thread: main
Map 2 - Thread: boundedElastic-1
Received A! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
Received B! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
Received C! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
Received D! on Thread: boundedElastic-1
예제를 조금 수정한 코드를 보자.
package org.example;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Main {
public static void main(String[] args) {
Flux.just(1, 2, 0, 4) // 1,2,0,4 네개의 아이템을 순차적으로 방출하는 단일 스트림 생성
.map(n -> { // 스레드 분기 없이 호출 스레드에서 해당 스트림 아이템에 대해서 나누기변환 연산수행(아래 .subscribe를 만나기 전까지 지연됨!)
System.out.println("Map 1 - Thread: " + Thread.currentThread().getName());
return 10 / n;
})
.publishOn(Schedulers.boundedElastic()) //이후 작업은 boundedElastic이라는 사전정의된 별도 스레드에서 하도록 지정함!
.map(s -> {
System.out.println("Map 2 - Thread: " + Thread.currentThread().getName());
return s + "!";
})
.doOnNext(s -> { // 스트림에 영향을 주지 않으면 로깅등 부가작업을 할때 사용
System.out.println("doOnNext - Thread: " + Thread.currentThread().getName() + " with value: " + s);
})
.doOnError(error -> { // Exception이 발생한 경우만 여기로 떨어진다.
System.out.println("Error occurred - Thread: " + Thread.currentThread().getName() + " with error: " + error.getMessage());
})
.subscribe( // 지연 실행을 시작하는 시점이며, 마지막 스케줄러 지정이 boundedElastic이라 boundedElastic 스레드를 통해 수행됨
s -> System.out.println("Received " + s + " on Thread: " + Thread.currentThread().getName()));
// boundElastic스레드로 중간에 분기되기 때문에 이 출력이 마지막 줄이 아닐 수 있다!
// 자바에서는 main함수가 종료되더라도 다른 스레드가 강제종료되지 않는다!
System.out.println("main end");
}
}
실행결과
> Task :Main.main()
Map 1 - Thread: main
Map 1 - Thread: main
Map 1 - Thread: main
Map 2 - Thread: boundedElastic-1
doOnNext - Thread: boundedElastic-1 with value: 10!
Received 10! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
doOnNext - Thread: boundedElastic-1 with value: 5!
Received 5! on Thread: boundedElastic-1
main end
Error occurred - Thread: boundedElastic-1 with error: / by zero
이 예시에서는 onDoNext, onDoError, main end시 스레드 종료 개념을 추가적으로 배울 수 있다.
주석을 주의깊에 보면 되기 때문에 따로 설명은 안하겠다.
반응형
'Programming > JAVA' 카테고리의 다른 글
| Reactor Context 사용법: contextWrite, deferContextual, ThreadLocal 차이 (0) | 2025.05.27 |
|---|---|
| Java 제네릭 개념: 클래스와 메서드 타입 파라미터 (0) | 2025.05.27 |
| Java BiFunction과 Function: andThen, apply 함수 체이닝 (0) | 2024.01.01 |
| Java Optional 개념과 null 처리 패턴 (1) | 2023.11.04 |
| Java enum 개념: 필드, 생성자, 메서드 사용 (0) | 2023.11.03 |
