본문 바로가기

Java/concurrent

[Java-concurrent] CompletableFuture 톺아보기 - AsyncSupply

static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
    CompletableFuture<T> dep; 
    Supplier<? extends T> fn;
    
    AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {
        this.dep = dep; this.fn = fn;
    }

    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) {}
    public final boolean exec() { run(); return false; }

    public void run() {
        // 1. 전역 변수를 지역 변수에 옮기고 전역 변수 참조 해제
        CompletableFuture<T> d; 
        Supplier<? extends T> f;
        if ((d = dep) != null && (f = fn) != null) {
            dep = null; 
            fn = null;
            // 2. 결과가 아직 없으면
            if (d.result == null) {
                try {
                    // 3. CompletableFuture 완료 시도
                    // 궁극적으로 result값을 업데이트
                    d.completeValue(f.get());
                } catch (Throwable ex) {
                    d.completeThrowable(ex);
                }
            }
            // 4. 이후 실행될 작업을 실행
            d.postComplete();
        }
    }
  }

 

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

	volatile Object result;       // Either the result or boxed AltResult
	// postComplete를 check
	// treiber : 운전자 (driver)
	volatile Completion stack;    // Top of Treiber stack of dependent actions
		

	static final AltResult NIL = new AltResult(null);
	...
	// 2. CompletableFutere의 Result 필드가 null 일 때
	// t가 null이라면 AltResult | t
	final boolean completeValue(T t) {
	    return RESULT.compareAndSet(this, null, (t == null) ? NIL : t);
	}
	...
  private static final VarHandle RESULT;
  private static final VarHandle STACK;
  private static final VarHandle NEXT;
  
  static {
    try {
        MethodHandles.Lookup l = MethodHandles.lookup();
        // 1. CompletableFuture 클래스의 result 필드 VarHandle
        RESULT = l.findVarHandle(CompletableFuture.class, "result", Object.class);
        STACK = l.findVarHandle(CompletableFuture.class, "stack", Completion.class);
        NEXT = l.findVarHandle(Completion.class, "next", Completion.class);
    } catch (ReflectiveOperationException e) {
        throw new ExceptionInInitializerError(e);
    }

    // Reduce the risk of rare disastrous classloading in first call to
    // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
    Class<?> ensureLoaded = LockSupport.class;
  }
}

 

/* 주석에 나와있는 것 처럼 '변수 f'에는 현재의 dependents를 갖는다
* 멀티 스레드에서의 안정성을 위해 한 번에 한가지 path로만 확장되고, 
* 무한재귀를 피하기 위해 다른 path는 밀어낸다
*/
final void postComplete() {
        // 1. 현재 클래스를 참조
        CompletableFuture<?> f = this; 
        Completion h;
        /* 2
        * 최초로 참조된 CompletableFuture의 stack이 남아있거나
        * 다른 참조로 넘어가고, 해당 참조의 stack이 남아있는 동안 실행
        */
        while ((h = f.stack) != null ||
               (f != this && (h = (f = this).stack) != null)) {

            CompletableFuture<?> d; 
            Completion t;

            // 3. 현재 노드(h)를 next로 교체
            if (STACK.compareAndSet(f, h, t = h.next)) {
                if (t != null) {
                    // 4. 최초로 참조된 CompletableFuture가 아니면 현재 stack에 push
                    if (f != this) {
                        pushStack(h);
                        continue;
                    }
                    NEXT.compareAndSet(h, t, null); // try to detach
                }
                // 5. 종속된 작업 실행 후 반환된 인스턴스로 업데이트
                // null일 경우 현재 객체
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }
}
 
// 성공할 때까지 push 시도를 한다
final void pushStack(Completion c) {
  do {} while (!tryPushStack(c));
}

// 주어진 Completion(c)을 현재 인스턴스의 stack에 push
final boolean tryPushStack(Completion c) {
  Completion h = stack;
  // NEXT(VarHandle) : Completion의 volatile Completion 'next'필드 핸들링
  // volatile Completion을 변경 [c(주어진 completion) -> h(현재 인스턴스의 completion)]
  NEXT.set(c, h);         // CAS piggyback
  return STACK.compareAndSet(this, h, c);
}

'Java > concurrent' 카테고리의 다른 글

[Java-concurrent] CompletableFuture 톺아보기 - OT  (0) 2024.03.19