다음 코드를 보자(주석을 주의깊게 볼 것)

package org.example;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Main {
	public static void main(String[] args) {
		Flux.just("a", "b", "c", "d")  // 1,2,3,4 네개의 아이템을 순차적으로 방출하는 단일 스트림 생성
			.map(s -> {  // 스레드 분기 없이 호출 스레드에서 해당 스트림 아이템에 대해서 대문자변환 연산수행(아래 .subscribe를 만나기 전까지 지연됨!)
				System.out.println("Map 1 - Thread: " + Thread.currentThread().getName());
				return s.toUpperCase();
			})
			.publishOn(Schedulers.boundedElastic())  //이후 작업은 boundedElastic이라는 사전정의된 별도 스레드에서 하도록 지정함!
			.map(s -> {
				System.out.println("Map 2 - Thread: " + Thread.currentThread().getName());
				return s + "!";
			})
			.subscribe(  // 지연 실행을 시작하는 시점이며, 마지막 스케줄러 지정이 boundedElastic이라 boundedElastic 스레드를 통해 수행됨
				s -> System.out.println("Received " + s + " on Thread: " + Thread.currentThread().getName()));
	}
}

실행결과

> Task :Main.main()
Map 1 - Thread: main
Map 1 - Thread: main
Map 1 - Thread: main
Map 1 - Thread: main
Map 2 - Thread: boundedElastic-1
Received A! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
Received B! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
Received C! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
Received D! on Thread: boundedElastic-1

 

 

예제를 조금 수정한 코드를 보자.

package org.example;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Main {
	public static void main(String[] args) {
		Flux.just(1, 2, 0, 4)  // 1,2,0,4 네개의 아이템을 순차적으로 방출하는 단일 스트림 생성
			.map(n -> {  // 스레드 분기 없이 호출 스레드에서 해당 스트림 아이템에 대해서 나누기변환 연산수행(아래 .subscribe를 만나기 전까지 지연됨!)
				System.out.println("Map 1 - Thread: " + Thread.currentThread().getName());
				return 10 / n;
			})
			.publishOn(Schedulers.boundedElastic())  //이후 작업은 boundedElastic이라는 사전정의된 별도 스레드에서 하도록 지정함!
			.map(s -> {
				System.out.println("Map 2 - Thread: " + Thread.currentThread().getName());
				return s + "!";
			})
			.doOnNext(s -> {  // 스트림에 영향을 주지 않으면 로깅등 부가작업을 할때 사용
				System.out.println("doOnNext - Thread: " + Thread.currentThread().getName() + " with value: " + s);
			})
			.doOnError(error -> {  // Exception이 발생한 경우만 여기로 떨어진다.
				System.out.println("Error occurred - Thread: " + Thread.currentThread().getName() + " with error: " + error.getMessage());
			})
			.subscribe(  // 지연 실행을 시작하는 시점이며, 마지막 스케줄러 지정이 boundedElastic이라 boundedElastic 스레드를 통해 수행됨
				s -> System.out.println("Received " + s + " on Thread: " + Thread.currentThread().getName()));
		// boundElastic스레드로 중간에 분기되기 때문에 이 출력이 마지막 줄이 아닐 수 있다!
		// 자바에서는 main함수가 종료되더라도 다른 스레드가 강제종료되지 않는다!
		System.out.println("main end");
	}
}

실행결과

> Task :Main.main()
Map 1 - Thread: main
Map 1 - Thread: main
Map 1 - Thread: main
Map 2 - Thread: boundedElastic-1
doOnNext - Thread: boundedElastic-1 with value: 10!
Received 10! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
doOnNext - Thread: boundedElastic-1 with value: 5!
Received 5! on Thread: boundedElastic-1
main end
Error occurred - Thread: boundedElastic-1 with error: / by zero

이 예시에서는 onDoNext, onDoError, main end시 스레드 종료 개념을 추가적으로 배울 수 있다.

주석을 주의깊에 보면 되기 때문에 따로 설명은 안하겠다.

반응형

'Programming > JAVA' 카테고리의 다른 글

BiFunction, Function을 사용한 함수형 프로그래밍  (0) 2024.01.01
java Optional  (1) 2023.11.04
java enum  (0) 2023.11.03
IntelliJ 팁  (0) 2023.11.03
java공부  (0) 2023.08.12

+ Recent posts