본문 바로가기

Reactive-Programming

[Spring] Reactive Web (3)

Callback Hell 개선

| 코드

future.addCallback(data -> {
    ListenableFuture<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(
            Service2Uri, String.class, data.getBody()
    );
    future2.addCallback(data2 -> {
        ListenableFuture<String> future3 = myServiceV2.work(data.getBody());
        future3.addCallback(
                data3 -> dr.setResult(data3),
                ex -> dr.setErrorResult(ex)
        );
    }, ex -> {
        dr.setErrorResult(ex);
    }); 
}, ex -> {
    dr.setErrorResult(ex);
});

=> 연속적인 Callback 호출로 코드를 파악하기 복잡하고 가독성이 떨어진다.

=> Error 처리의 중복


개선 1.

** 클래스를 생성하여 메서드로 분리한다.

** API 호출 개선

@SuppressWarnings("deprecation")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Completion {

    Completion next;

    public void andAccept(Consumer<ResponseEntity<String>> consumer) {
        Completion completion = new AcceptCompletion(consumer);
        this.next = completion;

    }

    public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> function) {
        Completion completion = new ApplyCompletion(function);
        this.next = completion;
        return completion;
    }

    public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
        Completion completion = new Completion();
        lf.addCallback(res -> {
            completion.complete(res);
        }, ex -> {
            completion.error(ex);
        });
        return completion;
    }

    public void run(ResponseEntity<String> value) {

    }

    public void complete(ResponseEntity<String> res) {
        if (next != null) {
            next.run(res);
        }
    }

    public void error(Throwable ex) {
        // 에러 처리
    }
}

public class AcceptCompletion extends Completion {

    Consumer<ResponseEntity<String>> consumer;

    public AcceptCompletion(Consumer<ResponseEntity<String>> consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run(ResponseEntity<String> value) {
        consumer.accept(value);
    }
}

public class ApplyCompletion extends Completion {

    Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> function;

    public ApplyCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> function) {
        this.function = function;
    }

    @Override
    public void run(ResponseEntity<String> value) {
        ListenableFuture<ResponseEntity<String>> lf = function.apply(value);
        lf.addCallback(res -> complete(res), ex -> error(ex));
    }
}
Completion
        .from(asyncRestTemplate2.getForEntity(Service1Uri, String.class, "service1 " + idx))
        .andApply(res -> asyncRestTemplate.getForEntity(Service2Uri, String.class, res.getBody()))
        .andAccept(res -> dr.setResult(res.getBody()));

=> 다음과 같이 간단하게 작성할 수 있다.


개선 2.

** 에러 처리 중복 제거

@SuppressWarnings("deprecation")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Completion {

    Completion next;

    public Completion andError(Consumer<Throwable> consumer) {
        Completion completion = new ErrorCompletion(consumer);
        this.next = completion;
        return completion;
    }

    public void andAccept(Consumer<ResponseEntity<String>> consumer) {
        Completion completion = new AcceptCompletion(consumer);
        this.next = completion;

    }

    public Completion andApply(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> function) {
        Completion completion = new ApplyCompletion(function);
        this.next = completion;
        return completion;
    }

    public static Completion from(ListenableFuture<ResponseEntity<String>> lf) {
        Completion completion = new Completion();
        lf.addCallback(res -> {
            completion.complete(res);
        }, ex -> {
            completion.error(ex);
        });
        return completion;
    }

    public void run(ResponseEntity<String> value) {

    }

    public void complete(ResponseEntity<String> res) {
        if (next != null) {
            next.run(res);
        }
    }

    public void error(Throwable ex) {
        if (next != null) {
            next.error(ex);
        }
    }
}

public class AcceptCompletion extends Completion {

    Consumer<ResponseEntity<String>> consumer;

    public AcceptCompletion(Consumer<ResponseEntity<String>> consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run(ResponseEntity<String> value) {
        consumer.accept(value);
    }
}

public class ApplyCompletion extends Completion {

    Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> function;

    public ApplyCompletion(Function<ResponseEntity<String>, ListenableFuture<ResponseEntity<String>>> function) {
        this.function = function;
    }

    @Override
    public void run(ResponseEntity<String> value) {
        ListenableFuture<ResponseEntity<String>> lf = function.apply(value);
        lf.addCallback(res -> complete(res), ex -> error(ex));
    }
}

public class ErrorCompletion extends Completion {

    Consumer<Throwable> consumer;

    public ErrorCompletion(Consumer<Throwable> consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run(ResponseEntity<String> value) {
        if (next != null) {
            next.run(value);
        }
    }

    @Override
    public void error(Throwable ex) {
        consumer.accept(ex);
    }
}

=> Error 처리를 위한 ErrorCompletion 구현

Completion
        .from(asyncRestTemplate2.getForEntity(Service1Uri, String.class, "service1 " + idx))
        .andApply(res -> asyncRestTemplate.getForEntity(Service2Uri, String.class, res.getBody()))
        .andError(ex -> dr.setErrorResult(ex.toString()))
        .andAccept(res -> dr.setResult(res.getBody()));

=> endError를 호출함으로써 오류가 발생하면 error 메서드에서 setErrorResult를 실행하고, run에서 콜백을 호출하지 않아 다음 동작을 실행하지 않고 종료한다.


개선 3.

** 제네릭 적용

@SuppressWarnings("deprecation")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Completion<S, T> {

    Completion next;

    public Completion<T, T> andError(Consumer<Throwable> consumer) {
        Completion<T, T> completion = new ErrorCompletion<>(consumer);
        this.next = completion;
        return completion;
    }

    public void andAccept(Consumer<T> consumer) {
        Completion<T, Void> completion = new AcceptCompletion<T>(consumer);
        this.next = completion;

    }

    public <V> Completion<T, V> andApply(Function<T, ListenableFuture<V>> function) {
        Completion<T, V> completion = new ApplyCompletion<>(function);
        this.next = completion;
        return completion;
    }

    public static <S, T> Completion<S, T> from(ListenableFuture<T> lf) {
        Completion<S, T> completion = new Completion<>();
        lf.addCallback(res -> {
            completion.complete(res);
        }, ex -> {
            completion.error(ex);
        });
        return completion;
    }

    public void run(S value) {

    }

    public void complete(T res) {
        if (next != null) {
            next.run(res);
        }
    }

    public void error(Throwable ex) {
        if (next != null) {
            next.error(ex);
        }
    }
}

public class AcceptCompletion<S> extends Completion<S, Void> {

    Consumer<S> consumer;

    public AcceptCompletion(Consumer<S> consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run(S value) {
        consumer.accept(value);
    }
}

public class ApplyCompletion<S, T> extends Completion<S, T> {

    Function<S, ListenableFuture<T>> function;

    public ApplyCompletion(Function<S, ListenableFuture<T>> function) {
        this.function = function;
    }

    @Override
    public void run(S value) {
        ListenableFuture<T> lf = function.apply(value);
        lf.addCallback(res -> complete(res), ex -> error(ex));
    }
}

public class ErrorCompletion<T> extends Completion<T, T> {

    Consumer<Throwable> consumer;

    public ErrorCompletion(Consumer<Throwable> consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run(T value) {
        if (next != null) {
            next.run(value);
        }
    }

    @Override
    public void error(Throwable ex) {
        consumer.accept(ex);
    }
}
Completion
        .from(asyncRestTemplate2.getForEntity(Service1Uri, String.class, "service1 " + idx))
        .andApply(res -> asyncRestTemplate2.getForEntity(Service2Uri, String.class, res.getBody()))
        .andApply(res -> myServiceV2.work(res.getBody()))
        .andError(ex -> dr.setErrorResult(ex.toString()))
        .andAccept(res -> dr.setResult(res));

 

* REF

https://www.youtube.com/watch?v=Tb43EyWTSlQ&list=PLOLeoJ50I1kkqC4FuEztT__3xKSfR2fpw&index=6

'Reactive-Programming' 카테고리의 다른 글

[Spring] WebFlux란?  (0) 2022.10.02
[Spring] Reactive Web (4)  (0) 2022.10.01
[Spring] Reactive Web (2)  (0) 2022.09.27
[Spring] Reactive Web (1)  (0) 2022.09.25
[Java] 비동기 기술  (0) 2022.09.24