'Transform Java Future into a CompletableFuture
Java 8 introduces CompletableFuture, a new implementation of Future that is composable (includes a bunch of thenXxx methods). I'd like to use this exclusively, but many of the libraries I want to use return only non-composable Future instances.
Is there a way to wrap up a returned Future instances inside of a CompleteableFuture so that I can compose it?
Solution 1:[1]
If the library you want to use also offers a callback style method in addition to the Future style, you can provide it a handler that completes the CompletableFuture without any extra thread blocking. Like so:
AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
// ...
CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
completableFuture.complete(buffer);
}
@Override
public void failed(Throwable exc, Void attachment) {
completableFuture.completeExceptionally(exc);
}
});
completableFuture.thenApply(...)
Without the callback the only other way I see solving this is to use a polling loop that puts all your Future.isDone() checks on a single thread and then invoking complete whenever a Future is gettable.
Solution 2:[2]
If your Future is the result of a call to an ExecutorService method (e.g. submit()), the easiest would be to use the CompletableFuture.runAsync(Runnable, Executor) method instead.
From
Runnbale myTask = ... ;
Future<?> future = myExecutor.submit(myTask);
to
Runnbale myTask = ... ;
CompletableFuture<?> future = CompletableFuture.runAsync(myTask, myExecutor);
The CompletableFuture is then created "natively".
EDIT: Pursuing comments by @SamMefford corrected by @MartinAndersson, if you want to pass a Callable, you need to call supplyAsync(), converting the Callable<T> into a Supplier<T>, e.g. with:
CompletableFuture.supplyAsync(() -> {
try { return myCallable.call(); }
catch (Exception ex) { throw new CompletionException(ex); } // Or return default value
}, myExecutor);
Because T Callable.call() throws Exception; throws an exception and T Supplier.get(); doesn't, you have to catch the exception so prototypes are compatible.
A note on exception handling
The get() method doesn't specify a throws, which means it should not throw a checked exception. However, unchecked exception can be used. The code in CompletableFuture shows that CompletionException is used and is unchecked (i.e. is a RuntimeException), hence the catch/throw wrapping any exception into a CompletionException.
Also, as @WeGa indicated, you can use the handle() method to deal with exceptions potentially being thrown by the result:
CompletableFuture<T> future = CompletableFuture.supplyAsync(...);
future.handle((ex,res) -> {
if (ex != null) {
// An exception occurred ...
} else {
// No exception was thrown, 'res' is valid and can be handled here
}
});
Solution 3:[3]
I published a little futurity project that tries to make better than the straightforward way in the answer.
The main idea is to use the only one thread (and of course with not just a spin loop) to check all Futures states inside, which helps to avoid blocking a thread from a pool for each Future -> CompletableFuture transformation.
Usage example:
Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);
Solution 4:[4]
Suggestion:
http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/
But, basically:
public class CompletablePromiseContext {
private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();
public static void schedule(Runnable r) {
SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
}
}
And, the CompletablePromise:
public class CompletablePromise<V> extends CompletableFuture<V> {
private Future<V> future;
public CompletablePromise(Future<V> future) {
this.future = future;
CompletablePromiseContext.schedule(this::tryToComplete);
}
private void tryToComplete() {
if (future.isDone()) {
try {
complete(future.get());
} catch (InterruptedException e) {
completeExceptionally(e);
} catch (ExecutionException e) {
completeExceptionally(e.getCause());
}
return;
}
if (future.isCancelled()) {
cancel(true);
return;
}
CompletablePromiseContext.schedule(this::tryToComplete);
}
}
Example:
public class Main {
public static void main(String[] args) {
final ExecutorService service = Executors.newSingleThreadExecutor();
final Future<String> stringFuture = service.submit(() -> "success");
final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);
completableFuture.whenComplete((result, failure) -> {
System.out.println(result);
});
}
}
Solution 5:[5]
Let me suggest another (hopefully, better) option: https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata/concurrent
Briefly, the idea is the following:
- Introduce
CompletableTask<V>interface -- the union of theCompletionStage<V>+RunnableFuture<V> - Warp
ExecutorServiceto returnCompletableTaskfromsubmit(...)methods (instead ofFuture<V>) - Done, we have runnable AND composable Futures.
Implementation uses an alternative CompletionStage implementation (pay attention, CompletionStage rather than CompletableFuture):
Usage:
J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
.submit( someCallableA )
.thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
.thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c);
Solution 6:[6]
public static <T> CompletableFuture<T> fromFuture(Future<T> f) {
return CompletableFuture.completedFuture(null).thenCompose(avoid -> {
try {
return CompletableFuture.completedFuture(f.get());
} catch (InterruptedException e) {
return CompletableFuture.failedFuture(e);
} catch (ExecutionException e) {
return CompletableFuture.failedFuture(e.getCause());
}
});
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Kafkaesque |
| Solution 2 | |
| Solution 3 | |
| Solution 4 | |
| Solution 5 | Eugene |
| Solution 6 | eric goff |
