From ce6d16ef3760e2a0ade73128008570edb3c039f9 Mon Sep 17 00:00:00 2001 From: Quentin Legot Date: Sun, 6 Nov 2022 21:31:58 +0100 Subject: [PATCH] Reworked Asynchronous tasks --- .../toolbox/asynctasks/AsyncTasks.java | 75 +++++++++++++------ .../toolbox/asynctasks/TasksThread.java | 44 ----------- .../fr/altarik/toolbox/AsyncTaskTest.java | 14 ++-- 3 files changed, 57 insertions(+), 76 deletions(-) delete mode 100644 src/main/java/fr/altarik/toolbox/asynctasks/TasksThread.java diff --git a/src/main/java/fr/altarik/toolbox/asynctasks/AsyncTasks.java b/src/main/java/fr/altarik/toolbox/asynctasks/AsyncTasks.java index 2a0933a..0372b01 100644 --- a/src/main/java/fr/altarik/toolbox/asynctasks/AsyncTasks.java +++ b/src/main/java/fr/altarik/toolbox/asynctasks/AsyncTasks.java @@ -1,23 +1,43 @@ package fr.altarik.toolbox.asynctasks; -/** - * A non-blocking small sized time-consuming tasks to executed asynchronously, this was developed mainly to be used to avoid to block main threads with mysql requests in mind - */ -public class AsyncTasks { +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; - private static final TasksThread worker = new TasksThread(); +/** + * A non-blocking small sized time-consuming tasks to executed asynchronously. + */ +public class AsyncTasks implements AutoCloseable { + + private final ExecutorService worker; + + private AsyncTasks(int numberOfWorker) { + if(numberOfWorker == 1) { + worker = Executors.newSingleThreadExecutor(); + } else if (numberOfWorker <= 0) { + worker = Executors.newCachedThreadPool(); + } else { + worker = Executors.newFixedThreadPool(numberOfWorker); + } + } /** * Call this method at startup or before first use of {@link AsyncTasks#addTask(Runnable)}, cause without it, nothing will work * This method declare worker thread and start it, without call it, by calling addTask(Runnable), it'll add your task to Queue, but tasks will never be consumed. + * + * @return an instance of AsyncTasks */ - public static void initialize() { - worker.run(); + public static AsyncTasks initialize(int numberOfWorker) { + return new AsyncTasks(numberOfWorker); + } + + public static AsyncTasks initialize() { + return initialize(Runtime.getRuntime().availableProcessors()); } /** - *

Method used to add your task to a list of task, task are stored in a FIFO(First-In First-Out) implementation ({@link java.util.concurrent.BlockingQueue}). - * As BlockingQueue is a synchronized class, all operations to add or remove elements inside cannot have collisions issues

+ *

Method used to add your task to a list of task, task are stored in a FIFO(First-In First-Out) implementation ({@link java.util.concurrent.LinkedBlockingQueue}). + * As LinkedBlockingQueue is a synchronized class, all operations to add or remove elements inside cannot have collisions issues

*

Example:

*
      *      for(int i = 0; i < 4; i++) {
@@ -40,28 +60,37 @@ public class AsyncTasks {
      *     task 3
      *     3
      * 
- * The worker thread is sleeping if it doesn't have task to execute and wake up if necessary when you add a task + * The worker thread is sleeping if it doesn't have task to execute and wake up if necessary when you add a task. + * A worker which crash due to exception in the code of your task will automatically be recreated (see {@link java.util.concurrent.ThreadPoolExecutor} for more informations). * @param function task to be executed * @throws InterruptedException when worker thread or BlockQueue has been interrupted while waiting (which is anormal) */ - public static void addTask(Runnable function) throws InterruptedException { - if(worker.workerThread.isInterrupted()) - throw new InterruptedException("Async task thread has been interrupted while waiting for another task, which is anormal, please report this issue to developers"); - worker.tasks.put(function); - // this condition is non-atomic, but we want to avoid unwanted and useless interruption in the main thread(s) while waiting for the worker thread to be released - if(worker.isWaiting) { - worker.lock.lock(); - worker.lockSignal.signalAll(); - worker.lock.unlock(); + public void addTask(Runnable function) throws InterruptedException { + if(worker.isTerminated() || worker.isShutdown()) { + throw new InterruptedException("Worker has been terminated or shutdown, it's impossible to add new task"); } + worker.submit(function); } /** - * Return the numbers of produced tasks which hasn't been consumed yet - * @return numbers of produced tasks which hasn't been consumed yet + * This method is call when you want to close workers and wait for waiting tasks to finish + * */ - public static int numberOfWaitingTask() { - return worker.tasks.size(); + @Override + public void close() throws Exception { + worker.shutdown(); + boolean result = worker.awaitTermination(10, TimeUnit.SECONDS); + if(!result) { + worker.shutdownNow(); + throw new UnfinishedTasksException("Tasks take too many time to finish, shutdown has been enforce"); + } } + public static class UnfinishedTasksException extends Exception { + + public UnfinishedTasksException(String message) { + super(message); + } + + } } diff --git a/src/main/java/fr/altarik/toolbox/asynctasks/TasksThread.java b/src/main/java/fr/altarik/toolbox/asynctasks/TasksThread.java deleted file mode 100644 index 96e0ebb..0000000 --- a/src/main/java/fr/altarik/toolbox/asynctasks/TasksThread.java +++ /dev/null @@ -1,44 +0,0 @@ -package fr.altarik.toolbox.asynctasks; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Package private class, AsyncTasks is the interface user need to interact with it and users should never directly call this class - */ -class TasksThread { - - boolean isWaiting = false; - final ReentrantLock lock = new ReentrantLock(); - final Condition lockSignal = lock.newCondition(); - Thread workerThread; - - final BlockingQueue tasks = new LinkedBlockingQueue<>(); - - void run() { - workerThread = new Thread(() -> { - - try { - for(;;) { - while(tasks.isEmpty()) { - lock.lock(); - isWaiting = true; - lockSignal.await(); - isWaiting = false; - lock.unlock(); - } - while(!tasks.isEmpty()) { - tasks.take().run(); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); - workerThread.start(); - - } - -} diff --git a/src/test/java/fr/altarik/toolbox/AsyncTaskTest.java b/src/test/java/fr/altarik/toolbox/AsyncTaskTest.java index 0ff890e..6c73178 100644 --- a/src/test/java/fr/altarik/toolbox/AsyncTaskTest.java +++ b/src/test/java/fr/altarik/toolbox/AsyncTaskTest.java @@ -9,31 +9,27 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.assertArrayEquals; -public class AsyncTaskTest { +class AsyncTaskTest { private String log(String message) { return "[" + new Date() + "]" + message; } @Test - public void testAsyncOp() throws InterruptedException { + void testAsyncOp() throws Exception { int numberOfTasks = 10000; System.out.println("Initializing async tasks worker"); - AsyncTasks.initialize(); + AsyncTasks worker = AsyncTasks.initialize(1); // only testing on a single worker, otherwise result have a high chance to not be in the order we want Stack results = new Stack<>(); for(int i = 0; i < numberOfTasks; i++) { System.out.println(log("sending task " + i)); AtomicInteger atomicInteger = new AtomicInteger(i); - AsyncTasks.addTask(() -> { + worker.addTask(() -> { System.out.println(log(" task " + atomicInteger.get())); results.push(atomicInteger.get()); }); } - while(AsyncTasks.numberOfWaitingTask() != 0) { - synchronized (this) { - wait(20); // wait till last task finish - } - } + worker.close(); // wait until all worker terminated Integer[] expected = new Integer[numberOfTasks]; for(int i = 0; i < numberOfTasks; i++) { expected[i] = i;