Skip to content

Commit 719e4de

Browse files
committed
Allow to pass an Executor when converting Futures
This is to circumvent #179.
1 parent 0478d7d commit 719e4de

File tree

2 files changed

+61
-17
lines changed

2 files changed

+61
-17
lines changed

‎src/main/scala/scala/compat/java8/FutureConverters.scala‎

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import scala.language.implicitConversions
1717
importscala.concurrent.java8.FuturesConvertersImpl._
1818
importscala.concurrent.java8.FuturesConvertersImplCompat._
1919
importscala.concurrent.{Future, Promise, ExecutionContext, ExecutionContextExecutorService, ExecutionContextExecutor }
20-
importjava.util.concurrent.{CompletionStage, Executor, ExecutorService }
20+
importjava.util.concurrent.{CompletionStage, Executor, ExecutorService, ForkJoinPool }
2121
importjava.util.function.Consumer
2222

2323
/**
@@ -59,16 +59,38 @@ object FutureConverters{
5959
* transformations to their asynchronous counterparts, i.e.
6060
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
6161
*
62+
* Callbacks will run on ForkJoinPool.commonPool(), unless it does not
63+
* support a parallelism level of at least two, in which case a new Thread
64+
* is used.
65+
*
6266
* @paramf The Scala Future which may eventually supply the completion for
6367
* the returned CompletionStage
6468
* @return a CompletionStage that runs all callbacks asynchronously and does
6569
* not support the CompletableFuture interface
6670
*/
67-
deftoJava[T](f: Future[T]):CompletionStage[T] ={
71+
deftoJava[T](f: Future[T]):CompletionStage[T] = toJava(f, ForkJoinPool.commonPool())
72+
73+
/**
74+
* Returns a CompletionStage that will be completed with the same value or
75+
* exception as the given Scala Future when that completes. Since the Future is a read-only
76+
* representation, this CompletionStage does not support the
77+
* <code>toCompletableFuture</code> method. The semantics of Scala Future
78+
* demand that all callbacks are invoked asynchronously by default, therefore
79+
* the returned CompletionStage routes all calls to synchronous
80+
* transformations to their asynchronous counterparts, i.e.
81+
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
82+
*
83+
* @paramf The Scala Future which may eventually supply the completion for
84+
* the returned CompletionStage
85+
* @parame The Java Executor onto which schedule the callbacks
86+
* @return a CompletionStage that runs all callbacks asynchronously and does
87+
* not support the CompletableFuture interface
88+
*/
89+
deftoJava[T](f: Future[T], e: Executor):CompletionStage[T] ={
6890
f match{
6991
casep: P[T@unchecked] => p.wrapped
7092
case _ =>
71-
valcf=newCF[T](f)
93+
valcf=newCF[T](f, e)
7294
implicitvalec=InternalCallbackExecutor
7395
f onComplete cf
7496
cf
@@ -189,10 +211,30 @@ object FutureConverters{
189211
* transformations to their asynchronous counterparts, i.e.
190212
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
191213
*
214+
* Callbacks will run on ForkJoinPool.commonPool(), unless it does not
215+
* support a parallelism level of at least two, in which case a new Thread
216+
* is used.
217+
*
192218
* @return a CompletionStage that runs all callbacks asynchronously and does
193219
* not support the CompletableFuture interface
194220
*/
195221
deftoJava:CompletionStage[T] =FutureConverters.toJava(__self)
222+
223+
/**
224+
* Returns a CompletionStage that will be completed with the same value or
225+
* exception as the given Scala Future when that completes. Since the Future is a read-only
226+
* representation, this CompletionStage does not support the
227+
* <code>toCompletableFuture</code> method. The semantics of Scala Future
228+
* demand that all callbacks are invoked asynchronously by default, therefore
229+
* the returned CompletionStage routes all calls to synchronous
230+
* transformations to their asynchronous counterparts, i.e.
231+
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
232+
*
233+
* @parame The Java Executor onto which schedule the callbacks
234+
* @return a CompletionStage that runs all callbacks asynchronously and does
235+
* not support the CompletableFuture interface
236+
*/
237+
deftoJava(e: Executor):CompletionStage[T] =FutureConverters.toJava(__self, e)
196238
}
197239

198240
implicitdefCompletionStageOps[T](cs: CompletionStage[T]):CompletionStageOps[T] =newCompletionStageOps(cs)

‎src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala‎

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import scala.util.{Failure, Success, Try}
2323

2424
// TODO: make this private[scala] when genjavadoc allows for that.
2525
objectFuturesConvertersImpl{
26-
classCF[T](valwrapped:Future[T]) extendsCompletableFuture[T] with (Try[T] =>Unit){
26+
classCF[T](valwrapped:Future[T], executor: Executor) extendsCompletableFuture[T] with (Try[T] =>Unit){
27+
defthis(wrapped: Future[T]) =this(wrapped, ForkJoinPool.commonPool())
28+
2729
overridedefapply(t: Try[T]):Unit= t match{
2830
caseSuccess(v) => complete(v)
2931
caseFailure(e) => completeExceptionally(e)
@@ -32,29 +34,29 @@ object FuturesConvertersImpl{
3234
/*
3335
* Ensure that completions of this future cannot hold the Scala Future’s completer hostage.
3436
*/
35-
overridedefthenApply[U](fn: JF[_ >:T, _ <:U]):CompletableFuture[U] = thenApplyAsync(fn)
37+
overridedefthenApply[U](fn: JF[_ >:T, _ <:U]):CompletableFuture[U] = thenApplyAsync(fn, executor)
3638

37-
overridedefthenAccept(fn: Consumer[_ >:T]):CompletableFuture[Void] = thenAcceptAsync(fn)
39+
overridedefthenAccept(fn: Consumer[_ >:T]):CompletableFuture[Void] = thenAcceptAsync(fn, executor)
3840

39-
overridedefthenRun(fn: Runnable):CompletableFuture[Void] = thenRunAsync(fn)
41+
overridedefthenRun(fn: Runnable):CompletableFuture[Void] = thenRunAsync(fn, executor)
4042

41-
overridedefthenCombine[U, V](cs: CompletionStage[_ <:U], fn: BiFunction[_ >:T, _ >:U, _ <:V]):CompletableFuture[V] = thenCombineAsync(cs, fn)
43+
overridedefthenCombine[U, V](cs: CompletionStage[_ <:U], fn: BiFunction[_ >:T, _ >:U, _ <:V]):CompletableFuture[V] = thenCombineAsync(cs, fn, executor)
4244

43-
overridedefthenAcceptBoth[U](cs: CompletionStage[_ <:U], fn: BiConsumer[_ >:T, _ >:U]):CompletableFuture[Void] = thenAcceptBothAsync(cs, fn)
45+
overridedefthenAcceptBoth[U](cs: CompletionStage[_ <:U], fn: BiConsumer[_ >:T, _ >:U]):CompletableFuture[Void] = thenAcceptBothAsync(cs, fn, executor)
4446

45-
overridedefrunAfterBoth(cs: CompletionStage[_], fn: Runnable):CompletableFuture[Void] = runAfterBothAsync(cs, fn)
47+
overridedefrunAfterBoth(cs: CompletionStage[_], fn: Runnable):CompletableFuture[Void] = runAfterBothAsync(cs, fn, executor)
4648

47-
overridedefapplyToEither[U](cs: CompletionStage[_ <:T], fn: JF[_ >:T, U]):CompletableFuture[U] = applyToEitherAsync(cs, fn)
49+
overridedefapplyToEither[U](cs: CompletionStage[_ <:T], fn: JF[_ >:T, U]):CompletableFuture[U] = applyToEitherAsync(cs, fn, executor)
4850

49-
overridedefacceptEither(cs: CompletionStage[_ <:T], fn: Consumer[_ >:T]):CompletableFuture[Void] = acceptEitherAsync(cs, fn)
51+
overridedefacceptEither(cs: CompletionStage[_ <:T], fn: Consumer[_ >:T]):CompletableFuture[Void] = acceptEitherAsync(cs, fn, executor)
5052

51-
overridedefrunAfterEither(cs: CompletionStage[_], fn: Runnable):CompletableFuture[Void] = runAfterEitherAsync(cs, fn)
53+
overridedefrunAfterEither(cs: CompletionStage[_], fn: Runnable):CompletableFuture[Void] = runAfterEitherAsync(cs, fn, executor)
5254

53-
overridedefthenCompose[U](fn: JF[_ >:T, _ <:CompletionStage[U]]):CompletableFuture[U] = thenComposeAsync(fn)
55+
overridedefthenCompose[U](fn: JF[_ >:T, _ <:CompletionStage[U]]):CompletableFuture[U] = thenComposeAsync(fn, executor)
5456

55-
overridedefwhenComplete(fn: BiConsumer[_ >:T, _ >:Throwable]):CompletableFuture[T] = whenCompleteAsync(fn)
57+
overridedefwhenComplete(fn: BiConsumer[_ >:T, _ >:Throwable]):CompletableFuture[T] = whenCompleteAsync(fn, executor)
5658

57-
overridedefhandle[U](fn: BiFunction[_ >:T, Throwable, _ <:U]):CompletableFuture[U] = handleAsync(fn)
59+
overridedefhandle[U](fn: BiFunction[_ >:T, Throwable, _ <:U]):CompletableFuture[U] = handleAsync(fn, executor)
5860

5961
overridedefexceptionally(fn: JF[Throwable, _ <:T]):CompletableFuture[T] ={
6062
valcf=newCompletableFuture[T]
@@ -71,7 +73,7 @@ object FuturesConvertersImpl{
7173
if (n ne this) cf.complete(n.asInstanceOf[T])
7274
}
7375
}
74-
})
76+
}, executor)
7577
cf
7678
}
7779

0 commit comments

Comments
(0)