@@ -6,6 +6,7 @@ import graphql.execution.ResultPath
66import graphql.execution.pubsub.CapturingSubscriber
77import graphql.incremental.DelayedIncrementalPartialResult
88import org.awaitility.Awaitility
9+ import org.reactivestreams.Publisher
910import spock.lang.Specification
1011
1112import java.util.concurrent.CompletableFuture
@@ -58,7 +59,7 @@ class IncrementalCallStateDeferTest extends Specification{
5859 incrementalCallState. enqueue(offThread(" C" , 10 , " /field/path" ))
5960
6061when :
61- def subscriber = new graphql.execution.pubsub. CapturingSubscriber<DelayedIncrementalPartialResult > (){
62+ def subscriber = new CapturingSubscriber<DelayedIncrementalPartialResult > (){
6263@Override
6364void onComplete (){
6465assert false , " This should not be called!"
@@ -84,7 +85,7 @@ class IncrementalCallStateDeferTest extends Specification{
8485 incrementalCallState. enqueue(offThread(" C" , 10 , " /field/path" )) // <-- will finish first
8586
8687when :
87- def subscriber = new graphql.execution.pubsub. CapturingSubscriber<DelayedIncrementalPartialResult > (){
88+ def subscriber = new CapturingSubscriber<DelayedIncrementalPartialResult > (){
8889@Override
8990void onNext (DelayedIncrementalPartialResult executionResult ){
9091this . getEvents(). add(executionResult)
@@ -113,8 +114,8 @@ class IncrementalCallStateDeferTest extends Specification{
113114 incrementalCallState. enqueue(offThread(" C" , 10 , " /field/path" )) // <-- will finish first
114115
115116when :
116- def subscriber1 = new graphql.execution.pubsub. CapturingSubscriber<DelayedIncrementalPartialResult > ()
117- def subscriber2 = new graphql.execution.pubsub. CapturingSubscriber<DelayedIncrementalPartialResult > ()
117+ def subscriber1 = new CapturingSubscriber<DelayedIncrementalPartialResult > ()
118+ def subscriber2 = new CapturingSubscriber<DelayedIncrementalPartialResult > ()
118119 incrementalCallState. startDeferredCalls(). subscribe(subscriber1)
119120 incrementalCallState. startDeferredCalls(). subscribe(subscriber2)
120121
@@ -196,7 +197,38 @@ class IncrementalCallStateDeferTest extends Specification{
196197 results. any{it. incremental[0 ]. data[" c" ] == " C" }
197198 }
198199
200+ def " nothing happens until the publisher is subscribed to" (){
201+
202+ def startingValue = " *"
203+ given :
204+ def incrementalCallState = new IncrementalCallState ()
205+ incrementalCallState. enqueue(offThread({-> startingValue + " A" }, 100 , " /field/path" )) // <-- will finish last
206+ incrementalCallState. enqueue(offThread({-> startingValue + " B" }, 50 , " /field/path" )) // <-- will finish second
207+ incrementalCallState. enqueue(offThread({-> startingValue + " C" }, 10 , " /field/path" )) // <-- will finish first
208+
209+ when :
210+
211+ // get the publisher but not work has been done here
212+ def publisher = incrementalCallState. startDeferredCalls()
213+ // we are changing a side effect after the publisher is created
214+ startingValue = " _"
215+
216+ // subscription wll case the queue publisher to start draining the queue
217+ List<DelayedIncrementalPartialResult > results = subscribeAndWaitCalls(publisher)
218+
219+ then :
220+ assertResultsSizeAndHasNextRule(3 , results)
221+ results[0 ]. incremental[0 ]. data[" _c" ] == " _C"
222+ results[1 ]. incremental[0 ]. data[" _b" ] == " _B"
223+ results[2 ]. incremental[0 ]. data[" _a" ] == " _A"
224+ }
225+
226+
199227private static DeferredFragmentCall offThread (String data , int sleepTime , String path ){
228+ offThread(() -> data, sleepTime, path)
229+ }
230+
231+ private static DeferredFragmentCall offThread (Supplier<String > dataSupplier , int sleepTime , String path ){
200232def callSupplier = new Supplier<CompletableFuture<DeferredFragmentCall.FieldWithExecutionResult > > (){
201233@Override
202234CompletableFuture<DeferredFragmentCall.FieldWithExecutionResult > get (){
@@ -205,6 +237,7 @@ class IncrementalCallStateDeferTest extends Specification{
205237if (data == " Bang" ){
206238throw new RuntimeException (data)
207239 }
240+ String data = dataSupplier. get()
208241new DeferredFragmentCall.FieldWithExecutionResult (data. toLowerCase(), new ExecutionResultImpl (data, []))
209242 })
210243 }
@@ -240,10 +273,13 @@ class IncrementalCallStateDeferTest extends Specification{
240273 }
241274
242275private static List<DelayedIncrementalPartialResult > startAndWaitCalls (IncrementalCallState incrementalCallState ){
243- def subscriber = new CapturingSubscriber< DelayedIncrementalPartialResult > ()
244-
245- incrementalCallState . startDeferredCalls() . subscribe(subscriber)
276+ def publisher = incrementalCallState . startDeferredCalls ()
277+ return subscribeAndWaitCalls(publisher)
278+ }
246279
280+ private static List<DelayedIncrementalPartialResult > subscribeAndWaitCalls (Publisher<DelayedIncrementalPartialResult > publisher ){
281+ def subscriber = new CapturingSubscriber<DelayedIncrementalPartialResult > ()
282+ publisher. subscribe(subscriber)
247283Awaitility . await(). untilTrue(subscriber. isDone())
248284return subscriber. getEvents()
249285 }
0 commit comments