카테고리 없음
(25.10.01) Reactor 기반의 비동기 처리 로직
회사에서 코드를 보면서 어러 서버의 통신으로 개발이 진행되면서,
TPS 와 연동하는데 있어서 Java의 Reactor 기반으 작성된 코드를 보고 활용할 필요가 있었다.
낯설 수 있는 Reactor에 대해서 사용 방법과 구조를 나름대로 정리해보고 찾아보면서 기록하고자 했다.
Reactor
- Project Reactor
- JVM 기반 비동기/논블로킹 프로그래밍을 위한 리액티브 라이브러리
- 대표 타입들을 통해서 비동기 처리 로직을 구현
- Reactive Streams 표준(Publisher, Subscriber, Subscription) 구현체 중 하나
- scheduler와 함께 연산자와 사용해 활용할 수 있음
- .subscribeOn(scheduler) → 구독 시점 및 업스트림 실행 컨텍스트 지정
- .publishOn(scheduler) → 다운스트림 실행 컨텍스트를 바꿔줌.
- scheduler와 함께 연산자와 사용해 활용할 수 있음
- 대표 타입
- Mono<T> : 최대 1개의 데이터 발행 (0 또는 1)
- Flux<T> : 0개 이상 여러 개의 데이터 발행
Reactor의 사용
- 비동기 작업을 선언적으로 표현 ex) 네트워크 I/O, DB 호출
- 데이터 흐름(파이프라인) 을 연산자(map, flatMap, timeout, onErrorResume 등)로 조립해서 사용
- 작동 방법
- 결과는 즉시 계산되지 않고, 구독(subscribe) 해야 실행되는 작동 → 지연 실행(Lazy evaluation).
- 논블로킹 → 스레드 효율적 사용가능할 수 있도록 함
Reactor(w/ scheduler) 코드 리뷰
public class msgClientManager {
private final MsgSessionsManager msgSessionsManager;
private final AtomicInteger roundRobinCounter = new AtomicInteger(0);
public Mono<String> sendMessage(String message) {
List<MsgSession> actives = msgSessionsManager.getActiveSessions();
if (actives.isEmpty()) {
return Mono.error(new IllegalStateException("사용가능 세션 없음"));
}
int start = Math.floorMod(roundRobinCounter.getAndIncrement(), actives.size());
return actives.get(start).send(message)
.timeout(Duration.ofMillis(msgClientProperties.getRestTimeOt()))
.onErrorResume(TimeoutException.class, e -> {
return Mono.empty();
});
}
}
public void executeSendProcess() throw Exception {
...
Mono<String> result = msgClientManager.sendMessage(extractJsonUrl(jsonObj));
result.subscribeOn(Schedulers.boundedElastic())
.subscribe(
msgService::handleServerResponse, // handleServerResponse 메서드 내 오류 처리
error -> {
log.error("처리중 에러발생");
}
);
...
Reactor 를 사용한 Data Flow
메시지 세션을 활용해 메시지를 보내는 Process를 예시
- executeSendProcess() → extractJsonUrl(jsonObj)로 메시지(문자열) 추출
- msgClientManager.sendMessage(message) 호출
- → 내부에서 활성 세션 하나를 round-robin으로 골라 MsgSession.send(message)를 실행 : Mono<String>을 반환
- 그 Mono에 timeout(...)과 onErrorResume(TimeoutException, ...)를 붙여 반환
- executeSendProcess()에서 subscribeOn(Schedulers.boundedElastic())로 구독을 시작하고, 성공 시 msgService::handleServerResponse 호출, 에러 시 로깅
Round-robin 인덱스 계산
int start = Math.floorMod(roundRobinCounter.getAndIncrement(), actives.size());
- roundRobinCounter는 AtomicInteger라 여러 스레드에서 안전하게 증가
- Java에서 특정 int값을 원자적으로 읽기/쓰기/증감/비교/업데이트 할 수 있는 타입
- 경쟁조건 없이 volatile과 CPU 자체적인 명령을 통해 내부에서 처리할 수 있도록 함(일종의 synchronized) → 라운드로빈을 위한 값이 꼬이지 않도록 순환 분배가 잘 이뤄질 수 있도록 함
- getAndIncrement()는 이전 값을 반환하고 증가
- Math.floorMod(..., actives.size())를 사용해 음수(AtomicInteger 오버플로우 시)도 안전하게 0..size-1 범위의 인덱스처리
실제 세션에 전송
return actives.get(start).send(message)
.timeout(Duration.ofMillis(msgClientProperties.getRestTimeOt()))
.onErrorResume(TimeoutException.class, e -> Mono.empty());
- actives.get(start).send(message)는 Mono<String>을 반환한다고 가정(네트워크 I/O 결과 혹은 세션 응답)
- .timeout(...) : 지정된 시간(밀리초) 안에 Mono가 성공(또는 에러) 신호를 내지 않으면 타임아웃 예외
- 리액터의 timeout은 타임아웃 시 java.util.concurrent.TimeoutException
- .onErrorResume(TimeoutException.class, e -> Mono.empty()) : 타임아웃이 발생하면 Mono.empty() 값 없음(정상 완료) 대체
- onNext는 호출되지 않고 onComplete만 발생
구독/스케줄링
// 위의 일련의 과정을 통한 Mono<String> 결과를 받음
Mono<String> result = msgClientManager.sendMessage(extractJsonUrl(jsonObj));
result.subscribeOn(Schedulers.boundedElastic())
.subscribe(
msgService::handleServerResponse, // onNext
error -> { log.eror("처리중 에러발생"); } // onError (typo: eror -> error)
);
- **subscribeOn(Schedulers.boundedElastic())**는 구독(subscribe) 발생 시점과 upstream 연산(=send 포함)을 boundedElastic 스케줄러에서 실행
- result를 그냥 마무리 할 수는 X 구독을 통해서 Scheduler 처리를 해야함
- Scheduler.boundedElastic
- Reactor 제공 스레드 풀 스케쥴러 중 하나
- 블로킹 I/O 적합 스케쥴러
- 일반적인 parallel() 스케쥴러는 논블로킹시 병렬처리리가 가능하나, 모든 스레드를 사용하면서 연산 처리 능력에 의한 성능 제한이 발생할 수 있음 : 막힘
- 블로킹을 사용해서 하나의 요청 세션에 대해 필요한 만큼의 스레드를 확보해서 재사용하면서 스레드 풀 내에서만 기다리면서 사용할 수 있도록 스케쥴러가 배정
- 다른 작업에 영향을 줄 수 없음
- .subscribe(onNext, onError) :
- 성공적으로 String이 발행되면 msgService.handleServerResponse(String)
- **Mono.error(...)**타임아웃 외의 에러가 발생하면 error 람다가 호출됩
- 참고) Mono.empty()(타임아웃으로 대체된 경우)는 onNext를 호출하지 않고 onComplete만 발생
- onComplete 콜백을 등록하지 않았으므로 그냥 아무 동작 없는 것으로 간주 → 로깅처리만 했음
Java 문법과 프로그래밍만 이해하는 것 이외에 전체적인 Server 간 구조에도 이해할 수 있을 필요가 있다고 생각하며,
위의 Reactor 및 스케쥴러 역시 비동기 블로킹 통신에서 중요하므로 이를 활용할 수 있도록 해야할 것이다.
