반응형

Project Reactor의 Flux 예제에서는 데이터가 언제 흐르기 시작하는지, publishOn 이후 어느 스레드에서 실행되는지가 핵심입니다.

이 글은 Flux.just, map, publishOn, subscribe가 어떤 순서로 동작하고 boundedElastic 스케줄러가 어디부터 적용되는지 확인하는 메모입니다.

 

핵심 정리

Reactor 코드는 선언한 순간 바로 실행되는 것이 아니라 subscribe를 만났을 때 실제 흐름이 시작됩니다. publishOn을 사용하면 그 이후의 연산이 지정한 Scheduler에서 실행됩니다. 예제에서 첫 번째 map은 main 스레드에서 실행되고, publishOn 이후의 map과 subscribe 소비자는 boundedElastic 스레드에서 실행됩니다. 에러가 발생하는 예제에서는 어느 단계에서 예외가 생기고 이후 연산이 어떻게 중단되는지도 함께 확인할 수 있습니다.

  • Flux.just는 여러 값을 순서대로 흘려보낼 Publisher를 만듭니다.
  • map은 각 아이템을 변환하지만 subscribe 전까지 실제 실행되지 않습니다.
  • subscribe는 지연된 스트림 실행을 시작하는 지점입니다.
  • publishOn은 그 이후 연산이 실행될 Scheduler를 바꿉니다.
  • boundedElastic은 블로킹 작업이나 별도 스레드가 필요한 작업에 자주 쓰입니다.
  • 중간 map에서 예외가 나면 이후 연산과 소비 흐름도 영향을 받습니다.

원문은 코드 주석이 좋지만 제목이 reactor #1이라 검색 의도가 약했습니다. 이번 보강은 publishOn과 subscribe 실행 시점을 먼저 잡아 예제 출력의 스레드 이름을 이해하기 쉽게 만들었습니다.

이어서 볼 글

 

다음 코드를 보자(주석을 주의깊게 볼 것)

package org.example;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Main {
	public static void main(String[] args) {
		Flux.just("a", "b", "c", "d")  // 1,2,3,4 네개의 아이템을 순차적으로 방출하는 단일 스트림 생성
			.map(s -> {  // 스레드 분기 없이 호출 스레드에서 해당 스트림 아이템에 대해서 대문자변환 연산수행(아래 .subscribe를 만나기 전까지 지연됨!)
				System.out.println("Map 1 - Thread: " + Thread.currentThread().getName());
				return s.toUpperCase();
			})
			.publishOn(Schedulers.boundedElastic())  //이후 작업은 boundedElastic이라는 사전정의된 별도 스레드에서 하도록 지정함!
			.map(s -> {
				System.out.println("Map 2 - Thread: " + Thread.currentThread().getName());
				return s + "!";
			})
			.subscribe(  // 지연 실행을 시작하는 시점이며, 마지막 스케줄러 지정이 boundedElastic이라 boundedElastic 스레드를 통해 수행됨
				s -> System.out.println("Received " + s + " on Thread: " + Thread.currentThread().getName()));
	}
}

실행결과

> Task :Main.main()
Map 1 - Thread: main
Map 1 - Thread: main
Map 1 - Thread: main
Map 1 - Thread: main
Map 2 - Thread: boundedElastic-1
Received A! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
Received B! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
Received C! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
Received D! on Thread: boundedElastic-1

예제를 조금 수정한 코드를 보자.

package org.example;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Main {
	public static void main(String[] args) {
		Flux.just(1, 2, 0, 4)  // 1,2,0,4 네개의 아이템을 순차적으로 방출하는 단일 스트림 생성
			.map(n -> {  // 스레드 분기 없이 호출 스레드에서 해당 스트림 아이템에 대해서 나누기변환 연산수행(아래 .subscribe를 만나기 전까지 지연됨!)
				System.out.println("Map 1 - Thread: " + Thread.currentThread().getName());
				return 10 / n;
			})
			.publishOn(Schedulers.boundedElastic())  //이후 작업은 boundedElastic이라는 사전정의된 별도 스레드에서 하도록 지정함!
			.map(s -> {
				System.out.println("Map 2 - Thread: " + Thread.currentThread().getName());
				return s + "!";
			})
			.doOnNext(s -> {  // 스트림에 영향을 주지 않으면 로깅등 부가작업을 할때 사용
				System.out.println("doOnNext - Thread: " + Thread.currentThread().getName() + " with value: " + s);
			})
			.doOnError(error -> {  // Exception이 발생한 경우만 여기로 떨어진다.
				System.out.println("Error occurred - Thread: " + Thread.currentThread().getName() + " with error: " + error.getMessage());
			})
			.subscribe(  // 지연 실행을 시작하는 시점이며, 마지막 스케줄러 지정이 boundedElastic이라 boundedElastic 스레드를 통해 수행됨
				s -> System.out.println("Received " + s + " on Thread: " + Thread.currentThread().getName()));
		// boundElastic스레드로 중간에 분기되기 때문에 이 출력이 마지막 줄이 아닐 수 있다!
		// 자바에서는 main함수가 종료되더라도 다른 스레드가 강제종료되지 않는다!
		System.out.println("main end");
	}
}

실행결과

> Task :Main.main()
Map 1 - Thread: main
Map 1 - Thread: main
Map 1 - Thread: main
Map 2 - Thread: boundedElastic-1
doOnNext - Thread: boundedElastic-1 with value: 10!
Received 10! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
doOnNext - Thread: boundedElastic-1 with value: 5!
Received 5! on Thread: boundedElastic-1
main end
Error occurred - Thread: boundedElastic-1 with error: / by zero

이 예시에서는 onDoNext, onDoError, main end시 스레드 종료 개념을 추가적으로 배울 수 있다.

주석을 주의깊에 보면 되기 때문에 따로 설명은 안하겠다.

반응형

+ Recent posts