'Asynchronous single task executor

I'm in doubt if the solution below is correct for the following:

  • in a multi-user application any user can start processing of given data by clicking a button
  • processing takes long time thus it should be executed asynchronously to not block GUI
  • if one user has already started the processing, the other requests should be rejected till it completes

Below is the code used to solve this:

    public class Processor {
        private final ExecutorService execService = Executors.newSingleThreadExecutor();
        private final Object monitor = new Object();
        private AtomicReference<Task> runningTask = new AtomicReference<>(null);
        
        public Optional<CompletableFuture<String>> processDataAsync(String data) {
            if (runningTask.get() != null)
                return Optional.empty();  //rejecting data process request because another request is already being served
            
            synchronized (monitor) {
                if (runningTask.get() != null)
                    return Optional.empty();
                
                CompletableFuture<String> f = new CompletableFuture<>();
                f.whenComplete((r, e) -> runningTask.set(null));  //when processing completes, another data process request can be accepted
                
                Task task = new Task(f, data);
                runningTask.set(task);
                execService.submit(task);
                return Optional.of(f);
            }
        }
    }   

Task is Runnable as below:

    public class Task implements Runnable {
        private final CompletableFuture<String> result;
        private final String data;
        
        public Task(CompletableFuture<String> result, String data) {
            this.result = result;
            this.data = data;
        }
        
        @Override
        public void run() {
            String processingResult = processData(data);  //does some blocking stuff with data, returning result of processing
            result.complete(processingResult);
        }
    }

What confuses me here is the synchronization (i.e. blocking) in processDataAsync. I understand that blocking here is very short and not critical, but shouldn't asynchronous method be always implemented without blocking? If so, I can't imagine how "single processing" can be achieved without synchronization.



Solution 1:[1]

I think you could try to use the functionality of ExecutorService returned from Executors.newSingleThreadExecutor(). In JavaDoc of the mentioned method it's explained that:

/**
 * Tasks are guaranteed to execute
 * sequentially, and no more than one task will be active at any
 * given time. Unlike the otherwise equivalent
 * {@code newFixedThreadPool(1)} the returned executor is
 * guaranteed not to be reconfigurable to use additional threads.
 *
 * @return the newly created single-threaded Executor
 */
public static ExecutorService newSingleThreadExecutor() {/*...*/}

When you submit a Runnable to the ExecutorService returned from the method, it's put into the queue and executed only after the previous one is accomplished. I suppose this is what you need.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Sergey Tsypanov