본문 바로가기

Reactive-Programming

[Spring] Reactive Web (4)

CompletableFuture

  • Java 8에서 부터 사용 가능하다.
  • 비동기 작업 결과 
    • Future -> 비동기 작업의 결과를 담고 있는 Object 
    • ListenableFuture -> Callback 구조로 결과 처리
    • CompletableFuture -> 다양한 비동기 작업을 간단하게 수행할 수 있다.
  • Future, CompletionStage를 구현하고 있다.
    • CompletionStage -> 완료한 결과에 의존적으로 다른 작업을 실행할 수 있는 기능을 제공한다.
  • 별도의 스레드 풀을 가질 수 있다.
  • 리스트의 모든 값이 완료될 때까지 기다리거나 하나의 값만 완료되길 기다릴지 선택할 수 있다.
  • 람다 표현식, 파이프 라인닝을 활용해 Callback Hell 개선 

제공 메서드

  • CompletableFuture.completedFuture() -> 이미 작업이 완료된 CompletableFuture 객체를 생성할 수 있다.
CompletableFuture<Integer> f1 = CompletableFuture.completedFuture(1);
System.out.println(f1.get());
실행 결과 : 1
  • completableExceptionally() -> 예외 발생 시 예외를 담는다.
CompletableFuture<Integer> f3 =new CompletableFuture<>();
f3.completeExceptionally(new RuntimeException()); // 예외를 담고 있는다.
System.out.println(f3.get()); // get을 호출해야 예외가 발생한다.
실행 결과 :
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException
  • runAsync() -> Runnable을 받는다 : 결과 값 없는 비동기 작업 실행
CompletableFuture.runAsync(() -> {
	log.info("runAsync");
});
log.info("exit");
실행 결과 :
[main] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - exit
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - runAsync
  • thenRun() -> 의존적으로 다음 작업 실행 : 동일 스레드 보장
CompletableFuture
        .runAsync(() -> log.info("runAsync"))
        .thenRun(() -> log.info("thenRunAsync1"))
        .thenRun(() -> log.info("thenRunAsync2"));
실행 결과 :
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - runAsync
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenRunAsync1
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenRunAsync2
  • supplyAsync() -> Supplier를 받는다 : 결과 값이 존재하는 비동기 작업 실행
  • thenApply() -> Function을 받는다 : 앞의 작업의 결과물을 이용해서 작업 처리 가능, 반환 값 존재
  • thenAccept() -> Consumer를 받는다 : 결과를 받아서 처리, 반환 값 X
CompletableFuture
        .supplyAsync(() -> {
            log.info("supplyAsync");
            return 1;
        })
        .thenApply(res -> {
            log.info("theApply {}", res);
            return res + 1;
        })
        .thenAccept(res -> log.info("thenAccept {}", res));
실행 결과 :
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - supplyAsync
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - theApply 1
[ForkJoinPool.commonPool-worker-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenAccept 2
  • thenCompse() -> CompletableFuture를 반환할 때 사용 : flatMap과 비슷한 역할
CompletableFuture
        .supplyAsync(() -> {
            return 1;
        })
        .thenCompose(res -> {
            log.info("thenCompose {}", res);
            return CompletableFuture.completedFuture( res + 1);
        })
        .thenAccept(res -> log.info("thenAccept {}", res));
실행 결과 : 
[main] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenCompose 1
[main] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenAccept 2
  • exceptionally() -> 작업 도중 에러가 발생했을 때 처리
CompletableFuture
        .supplyAsync(() -> {
            return 1;
        })
        .thenCompose(res -> {
            log.info("thenCompose {}", res);
            if (res == 1) throw new RuntimeException();
            return CompletableFuture.completedFuture( res + 1);
        })
        .exceptionally(e -> {
            return -10;
        })
        .thenAccept(res -> log.info("thenAccept {}", res));
실행 결과 :
[main] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenCompose 1
[main] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenCompose 1
  • thenApplyAsync() -> Function을 받음, 다른 스레드를 사용하여 비동기 작업 실행
ExecutorService es = Executors.newFixedThreadPool(10);

CompletableFuture
        .supplyAsync(() -> {
            log.info("supplyAsync");
            return 1;
        }, es)
        .thenCompose(res -> {
            log.info("thenCompose {}", res);
            return CompletableFuture.completedFuture(res);
        })
        .thenApplyAsync(res -> {
            log.info("thenApplyAsync {}", res);
            return res + 10;
        }, es)
        .thenAcceptAsync(res -> log.info("thenAcceptAsync {}", res), es);
실행 결과 :
[pool-1-thread-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - supplyAsync

[pool-1-thread-1] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenCompose 1
[pool-1-thread-2] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenApplyAsync 1
[pool-1-thread-3] INFO reactivestudy.springreactivestudy.reactive.async.v4.CompletableFutureEx - thenAcceptAsync 11

* CompletableFuture를 사용하면 이전 포스팅에서 완성한 코드보다 다음과 같이 더 간결하게 코드를 작성할 수 있다.

@Slf4j
@RestController
@RequestMapping("/v4")
@SuppressWarnings("deprecation")
public class SpringAsyncControllerV4 {
    public static final String Service1Uri = "http://localhost:9090/remote-service?req={req}";
    public static final String Service2Uri = "http://localhost:9090/remote-service2?req={req}";
    private final AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();

    // Netty를 사용하여 Thread 1개를 사용하도록 설정
    private final AsyncRestTemplate asyncRestTemplate2 = new AsyncRestTemplate(
            new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))
    );

    private final MyServiceV2 myServiceV2;

    public SpringAsyncControllerV4(MyServiceV2 myServiceV2) {
        this.myServiceV2 = myServiceV2;
    }

    /**
     * 의존성이 존재하는 경우
     */
    @GetMapping("/rest4")
    public DeferredResult rest4(int idx) {
        DeferredResult<String> dr = new DeferredResult<>();

        toCompletableFuture(asyncRestTemplate2.getForEntity(Service1Uri, String.class, "service1 " + idx))
                .thenCompose(res ->
                        toCompletableFuture(asyncRestTemplate2.getForEntity(Service2Uri, String.class, res.getBody()))
                )
                .thenCompose(res ->
                        toCompletableFuture(myServiceV2.work(res.getBody()))
                )
                .thenAccept(res -> dr.setResult(res))
                .exceptionally(ex -> { dr.setErrorResult(ex.getMessage()); return null; });

        return dr;
    }

    <T> CompletableFuture<T> toCompletableFuture(ListenableFuture<T> lf) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();

        lf.addCallback(
                res -> completableFuture.complete(res),
                ex -> completableFuture.completeExceptionally(ex)
        );
        return completableFuture;

    }
}

* 동기적 서비스 로직을 사용했을 경우

@Service
public class MyServiceV4 {

    public String work(String req) {
        return req + "/asyncWork";
    }

}
@Slf4j
@RestController
@RequestMapping("/v4")
@SuppressWarnings("deprecation")
public class SpringAsyncControllerV4 {
    public static final String Service1Uri = "http://localhost:9090/remote-service?req={req}";
    public static final String Service2Uri = "http://localhost:9090/remote-service2?req={req}";
    private final AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();

    // Netty를 사용하여 Thread 1개를 사용하도록 설정
    private final AsyncRestTemplate asyncRestTemplate2 = new AsyncRestTemplate(
            new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))
    );

    private final MyServiceV4 myServiceV4;

    public SpringAsyncControllerV4(MyServiceV4 myServiceV4) {
        this.myServiceV4 = myServiceV4;
    }

    /**
     * 의존성이 존재하는 경우
     */
    @GetMapping("/rest4")
    public DeferredResult rest4(int idx) {
        DeferredResult<String> dr = new DeferredResult<>();

        toCompletableFuture(asyncRestTemplate2.getForEntity(Service1Uri, String.class, "service1 " + idx))
                .thenCompose(res ->
                        toCompletableFuture(asyncRestTemplate2.getForEntity(Service2Uri, String.class, res.getBody()))
                )
                .thenApplyAsync(res ->
                        myServiceV4.work(res.getBody())
                )
                .thenAccept(res -> dr.setResult(res))
                .exceptionally(ex -> { dr.setErrorResult(ex.getMessage()); return null; });

        return dr;
    }

    <T> CompletableFuture<T> toCompletableFuture(ListenableFuture<T> lf) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();

        lf.addCallback(
                res -> completableFuture.complete(res),
                ex -> completableFuture.completeExceptionally(ex)
        );
        return completableFuture;

    }
}

=> CompletableFuture를 사용해서 비동기적으로 실행시킬 수 있다.

 

* REF

https://www.youtube.com/watch?v=PzxV-bmLSFY&list=PLOLeoJ50I1kkqC4FuEztT__3xKSfR2fpw&index=7

'Reactive-Programming' 카테고리의 다른 글

[Reactor] Mono란?  (0) 2022.10.03
[Spring] WebFlux란?  (0) 2022.10.02
[Spring] Reactive Web (3)  (0) 2022.09.30
[Spring] Reactive Web (2)  (0) 2022.09.27
[Spring] Reactive Web (1)  (0) 2022.09.25