다음 코드를 보자(주석을 주의깊게 볼 것)
package org.example;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Main {
public static void main(String[] args) {
Flux.just("a", "b", "c", "d") // 1,2,3,4 네개의 아이템을 순차적으로 방출하는 단일 스트림 생성
.map(s -> { // 스레드 분기 없이 호출 스레드에서 해당 스트림 아이템에 대해서 대문자변환 연산수행(아래 .subscribe를 만나기 전까지 지연됨!)
System.out.println("Map 1 - Thread: " + Thread.currentThread().getName());
return s.toUpperCase();
})
.publishOn(Schedulers.boundedElastic()) //이후 작업은 boundedElastic이라는 사전정의된 별도 스레드에서 하도록 지정함!
.map(s -> {
System.out.println("Map 2 - Thread: " + Thread.currentThread().getName());
return s + "!";
})
.subscribe( // 지연 실행을 시작하는 시점이며, 마지막 스케줄러 지정이 boundedElastic이라 boundedElastic 스레드를 통해 수행됨
s -> System.out.println("Received " + s + " on Thread: " + Thread.currentThread().getName()));
}
}
실행결과
> Task :Main.main()
Map 1 - Thread: main
Map 1 - Thread: main
Map 1 - Thread: main
Map 1 - Thread: main
Map 2 - Thread: boundedElastic-1
Received A! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
Received B! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
Received C! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
Received D! on Thread: boundedElastic-1
예제를 조금 수정한 코드를 보자.
package org.example;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Main {
public static void main(String[] args) {
Flux.just(1, 2, 0, 4) // 1,2,0,4 네개의 아이템을 순차적으로 방출하는 단일 스트림 생성
.map(n -> { // 스레드 분기 없이 호출 스레드에서 해당 스트림 아이템에 대해서 나누기변환 연산수행(아래 .subscribe를 만나기 전까지 지연됨!)
System.out.println("Map 1 - Thread: " + Thread.currentThread().getName());
return 10 / n;
})
.publishOn(Schedulers.boundedElastic()) //이후 작업은 boundedElastic이라는 사전정의된 별도 스레드에서 하도록 지정함!
.map(s -> {
System.out.println("Map 2 - Thread: " + Thread.currentThread().getName());
return s + "!";
})
.doOnNext(s -> { // 스트림에 영향을 주지 않으면 로깅등 부가작업을 할때 사용
System.out.println("doOnNext - Thread: " + Thread.currentThread().getName() + " with value: " + s);
})
.doOnError(error -> { // Exception이 발생한 경우만 여기로 떨어진다.
System.out.println("Error occurred - Thread: " + Thread.currentThread().getName() + " with error: " + error.getMessage());
})
.subscribe( // 지연 실행을 시작하는 시점이며, 마지막 스케줄러 지정이 boundedElastic이라 boundedElastic 스레드를 통해 수행됨
s -> System.out.println("Received " + s + " on Thread: " + Thread.currentThread().getName()));
// boundElastic스레드로 중간에 분기되기 때문에 이 출력이 마지막 줄이 아닐 수 있다!
// 자바에서는 main함수가 종료되더라도 다른 스레드가 강제종료되지 않는다!
System.out.println("main end");
}
}
실행결과
> Task :Main.main()
Map 1 - Thread: main
Map 1 - Thread: main
Map 1 - Thread: main
Map 2 - Thread: boundedElastic-1
doOnNext - Thread: boundedElastic-1 with value: 10!
Received 10! on Thread: boundedElastic-1
Map 2 - Thread: boundedElastic-1
doOnNext - Thread: boundedElastic-1 with value: 5!
Received 5! on Thread: boundedElastic-1
main end
Error occurred - Thread: boundedElastic-1 with error: / by zero
이 예시에서는 onDoNext, onDoError, main end시 스레드 종료 개념을 추가적으로 배울 수 있다.
주석을 주의깊에 보면 되기 때문에 따로 설명은 안하겠다.
반응형
'Programming > JAVA' 카테고리의 다른 글
BiFunction, Function을 사용한 함수형 프로그래밍 (0) | 2024.01.01 |
---|---|
java Optional (1) | 2023.11.04 |
java enum (0) | 2023.11.03 |
IntelliJ 팁 (0) | 2023.11.03 |
java공부 (0) | 2023.08.12 |