Reactor Context: “리액터 체인 전체가 공유하는, 키-값 형태의 작은 Map”
- ThreadLocal처럼 코드를 통해 값을 전달하지 않고도 다운스트림(아래쪽) 연산자들이 값을 꺼내 쓸 수 있음
- 하지만 쓰레드에 묶이지 않고 Reactive 시퀀스와 함께 이동하므로, 쓰레드가 바뀌어도 그대로 보존됨
Mono<String> pipeline =
Mono.just("payload")
// ── 여기까지는 일반 Mono ──
.flatMap(data -> Mono.deferContextual(ctx ->
// Context 에 담긴 userId 꺼내서 활용
Mono.just("User " + ctx.get("userId") + " processed " + data)
))
// ── contextWrite 는 이 지점부터 downstream 에 적용 ──
.contextWrite(ctx -> ctx.put("userId", "alice123"));
위처럼 사용한다. deferContextual대신에 subscriberContext써도 된다.
Flux.just(...)
.doOnNext(...) // (1) 데이터 신호 흐를 때 requestId 읽기
.map(...) // (2) 다른 연산자
.contextWrite(...) // (3) 구독 직전 Context 설정
.subscribe() // (4) 구독 신호 발생 → (3) → (2) → (1) → 소스
// A. 올바른 예 – 읽는 쪽보다 "아래"에 contextWrite
Mono.deferContextual(ctx -> Mono.just(ctx.get("key")))
.contextWrite(Context.of("key", "v")); // ✔
// B. 잘못된 예 – contextWrite를 먼저 써버림
Mono<String> wrong =
Mono.just("x")
.contextWrite(Context.of("key", "v")) // 여기선 아직 값 읽지 않음
.flatMap(v -> Mono.deferContextual(ctx -> Mono.just(ctx.get("key"))));
// → NoSuchElementException (key를 못 찾음)
위 부분이 무지하게 햇갈렸다. .contextWrite()가 map에 insert하는건데.. 체인상 가장 나중에 해줘야 동작하고 이전에 해주면 동작안한다. data흐름과 호출흐름이 반대라서 그런거 같은데 recursive의 원리를 생각해보면 될듯
ThreadLocal과의 차이
엄청 헷갈리지만 contextWrite()가 체인 출구쪽에 기록해줘야 함에 비해서 이건 체인 입구쪽에 set해줘야 제대로 읽힌다.
근데 React에서는 스레드 변경가능성을 고려해야하므로 어차피 안쓰는게 좋음
타이밍 비교
|
ThreadLocal |
Reactor Context |
세팅 시점 |
체인(흐름) 입구 에서 set() |
체인 출구(끝) 에서 .contextWrite() |
전달 범위 |
“같은 스레드” 에서만 유지 |
“Reactive 시그널” 과 함께 끝까지 전파 |
정리 시점 |
처리 후 remove() 반드시 필요 |
불변 객체라, 별도 정리 불필요 |
import reactor.core.publisher.Mono;
import java.util.UUID;
public class ThreadLocalExample {
// ① ThreadLocal 선언
private static final ThreadLocal<String> REQUEST_ID = new ThreadLocal<>();
public Mono<String> process(String input) {
// ② 체인 입구에서 set()
REQUEST_ID.set(UUID.randomUUID().toString());
return Mono.just(input)
// ③ map 내부에서 읽기
.map(v -> "ThreadLocal → requestId="
+ REQUEST_ID.get()
+ ", payload=" + v)
// ④ 체인 종료 직전에 반드시 remove()
.doFinally(sig -> REQUEST_ID.remove());
}
public static void main(String[] args) {
new ThreadLocalExample()
.process("Hello")
.subscribe(System.out::println);
}
}
ThreadLocal예시
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import java.time.Duration;
import java.util.UUID;
public class ReactorContextExample {
public Mono<String> process(String input) {
return Mono.just(input)
// ① 실제 읽는 부분(deferContextual 또는 subscriberContext)
.flatMap(v -> Mono.deferContextual(ctx ->
Mono.just("ReactorContext → requestId="
+ ctx.get("requestId")
+ ", payload=" + v)))
// ② 체인 “출구” 에서 contextWrite
.contextWrite(ctx -> ctx.put("requestId", UUID.randomUUID().toString()));
}
public static void main(String[] args) {
new ReactorContextExample()
.process("Hello")
.subscribe(System.out::println);
}
}
React Context예시