Reworked Asynchronous tasks

This commit is contained in:
Quentin Legot 2022-11-06 21:31:58 +01:00
parent b98bc70c1f
commit ce6d16ef37
3 changed files with 57 additions and 76 deletions

View File

@ -1,23 +1,43 @@
package fr.altarik.toolbox.asynctasks; package fr.altarik.toolbox.asynctasks;
/** import java.util.concurrent.ExecutorService;
* A non-blocking small sized time-consuming tasks to executed asynchronously, this was developed mainly to be used to avoid to block main threads with mysql requests in mind import java.util.concurrent.Executors;
*/ import java.util.concurrent.TimeUnit;
public class AsyncTasks {
private static final TasksThread worker = new TasksThread(); /**
* A non-blocking small sized time-consuming tasks to executed asynchronously.
*/
public class AsyncTasks implements AutoCloseable {
private final ExecutorService worker;
private AsyncTasks(int numberOfWorker) {
if(numberOfWorker == 1) {
worker = Executors.newSingleThreadExecutor();
} else if (numberOfWorker <= 0) {
worker = Executors.newCachedThreadPool();
} else {
worker = Executors.newFixedThreadPool(numberOfWorker);
}
}
/** /**
* Call this method at startup or before first use of {@link AsyncTasks#addTask(Runnable)}, cause without it, nothing will work * Call this method at startup or before first use of {@link AsyncTasks#addTask(Runnable)}, cause without it, nothing will work
* This method declare worker thread and start it, without call it, by calling addTask(Runnable), it'll add your task to Queue, but tasks will never be consumed. * This method declare worker thread and start it, without call it, by calling addTask(Runnable), it'll add your task to Queue, but tasks will never be consumed.
*
* @return an instance of AsyncTasks
*/ */
public static void initialize() { public static AsyncTasks initialize(int numberOfWorker) {
worker.run(); return new AsyncTasks(numberOfWorker);
}
public static AsyncTasks initialize() {
return initialize(Runtime.getRuntime().availableProcessors());
} }
/** /**
* <p>Method used to add your task to a list of task, task are stored in a FIFO(First-In First-Out) implementation ({@link java.util.concurrent.BlockingQueue}). * <p>Method used to add your task to a list of task, task are stored in a FIFO(First-In First-Out) implementation ({@link java.util.concurrent.LinkedBlockingQueue}).
* As BlockingQueue is a synchronized class, all operations to add or remove elements inside cannot have collisions issues</p> * As LinkedBlockingQueue is a synchronized class, all operations to add or remove elements inside cannot have collisions issues</p>
* <p>Example: </p> * <p>Example: </p>
* <pre> * <pre>
* for(int i = 0; i &lt; 4; i++) { * for(int i = 0; i &lt; 4; i++) {
@ -40,28 +60,37 @@ public class AsyncTasks {
* task 3 * task 3
* 3 * 3
* </pre> * </pre>
* The worker thread is sleeping if it doesn't have task to execute and wake up if necessary when you add a task * The worker thread is sleeping if it doesn't have task to execute and wake up if necessary when you add a task.
* A worker which crash due to exception in the code of your task will automatically be recreated (see {@link java.util.concurrent.ThreadPoolExecutor} for more informations).
* @param function task to be executed * @param function task to be executed
* @throws InterruptedException when worker thread or BlockQueue has been interrupted while waiting (which is anormal) * @throws InterruptedException when worker thread or BlockQueue has been interrupted while waiting (which is anormal)
*/ */
public static void addTask(Runnable function) throws InterruptedException { public void addTask(Runnable function) throws InterruptedException {
if(worker.workerThread.isInterrupted()) if(worker.isTerminated() || worker.isShutdown()) {
throw new InterruptedException("Async task thread has been interrupted while waiting for another task, which is anormal, please report this issue to developers"); throw new InterruptedException("Worker has been terminated or shutdown, it's impossible to add new task");
worker.tasks.put(function);
// this condition is non-atomic, but we want to avoid unwanted and useless interruption in the main thread(s) while waiting for the worker thread to be released
if(worker.isWaiting) {
worker.lock.lock();
worker.lockSignal.signalAll();
worker.lock.unlock();
} }
worker.submit(function);
} }
/** /**
* Return the numbers of produced tasks which hasn't been consumed yet * This method is call when you want to close workers and wait for waiting tasks to finish
* @return numbers of produced tasks which hasn't been consumed yet *
*/ */
public static int numberOfWaitingTask() { @Override
return worker.tasks.size(); public void close() throws Exception {
worker.shutdown();
boolean result = worker.awaitTermination(10, TimeUnit.SECONDS);
if(!result) {
worker.shutdownNow();
throw new UnfinishedTasksException("Tasks take too many time to finish, shutdown has been enforce");
}
}
public static class UnfinishedTasksException extends Exception {
public UnfinishedTasksException(String message) {
super(message);
} }
} }
}

View File

@ -1,44 +0,0 @@
package fr.altarik.toolbox.asynctasks;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Package private class, AsyncTasks is the interface user need to interact with it and users should never directly call this class
*/
class TasksThread {
boolean isWaiting = false;
final ReentrantLock lock = new ReentrantLock();
final Condition lockSignal = lock.newCondition();
Thread workerThread;
final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
void run() {
workerThread = new Thread(() -> {
try {
for(;;) {
while(tasks.isEmpty()) {
lock.lock();
isWaiting = true;
lockSignal.await();
isWaiting = false;
lock.unlock();
}
while(!tasks.isEmpty()) {
tasks.take().run();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
workerThread.start();
}
}

View File

@ -9,31 +9,27 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertArrayEquals;
public class AsyncTaskTest { class AsyncTaskTest {
private String log(String message) { private String log(String message) {
return "[" + new Date() + "]" + message; return "[" + new Date() + "]" + message;
} }
@Test @Test
public void testAsyncOp() throws InterruptedException { void testAsyncOp() throws Exception {
int numberOfTasks = 10000; int numberOfTasks = 10000;
System.out.println("Initializing async tasks worker"); System.out.println("Initializing async tasks worker");
AsyncTasks.initialize(); AsyncTasks worker = AsyncTasks.initialize(1); // only testing on a single worker, otherwise result have a high chance to not be in the order we want
Stack<Integer> results = new Stack<>(); Stack<Integer> results = new Stack<>();
for(int i = 0; i < numberOfTasks; i++) { for(int i = 0; i < numberOfTasks; i++) {
System.out.println(log("sending task " + i)); System.out.println(log("sending task " + i));
AtomicInteger atomicInteger = new AtomicInteger(i); AtomicInteger atomicInteger = new AtomicInteger(i);
AsyncTasks.addTask(() -> { worker.addTask(() -> {
System.out.println(log(" task " + atomicInteger.get())); System.out.println(log(" task " + atomicInteger.get()));
results.push(atomicInteger.get()); results.push(atomicInteger.get());
}); });
} }
while(AsyncTasks.numberOfWaitingTask() != 0) { worker.close(); // wait until all worker terminated
synchronized (this) {
wait(20); // wait till last task finish
}
}
Integer[] expected = new Integer[numberOfTasks]; Integer[] expected = new Integer[numberOfTasks];
for(int i = 0; i < numberOfTasks; i++) { for(int i = 0; i < numberOfTasks; i++) {
expected[i] = i; expected[i] = i;