diff --git a/src/main/java/fr/altarik/toolbox/asynctasks/AsyncTasks.java b/src/main/java/fr/altarik/toolbox/asynctasks/AsyncTasks.java index 2a0933a..0372b01 100644 --- a/src/main/java/fr/altarik/toolbox/asynctasks/AsyncTasks.java +++ b/src/main/java/fr/altarik/toolbox/asynctasks/AsyncTasks.java @@ -1,23 +1,43 @@ package fr.altarik.toolbox.asynctasks; -/** - * A non-blocking small sized time-consuming tasks to executed asynchronously, this was developed mainly to be used to avoid to block main threads with mysql requests in mind - */ -public class AsyncTasks { +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; - private static final TasksThread worker = new TasksThread(); +/** + * A non-blocking small sized time-consuming tasks to executed asynchronously. + */ +public class AsyncTasks implements AutoCloseable { + + private final ExecutorService worker; + + private AsyncTasks(int numberOfWorker) { + if(numberOfWorker == 1) { + worker = Executors.newSingleThreadExecutor(); + } else if (numberOfWorker <= 0) { + worker = Executors.newCachedThreadPool(); + } else { + worker = Executors.newFixedThreadPool(numberOfWorker); + } + } /** * Call this method at startup or before first use of {@link AsyncTasks#addTask(Runnable)}, cause without it, nothing will work * This method declare worker thread and start it, without call it, by calling addTask(Runnable), it'll add your task to Queue, but tasks will never be consumed. + * + * @return an instance of AsyncTasks */ - public static void initialize() { - worker.run(); + public static AsyncTasks initialize(int numberOfWorker) { + return new AsyncTasks(numberOfWorker); + } + + public static AsyncTasks initialize() { + return initialize(Runtime.getRuntime().availableProcessors()); } /** - *
Method used to add your task to a list of task, task are stored in a FIFO(First-In First-Out) implementation ({@link java.util.concurrent.BlockingQueue}). - * As BlockingQueue is a synchronized class, all operations to add or remove elements inside cannot have collisions issues
+ *Method used to add your task to a list of task, task are stored in a FIFO(First-In First-Out) implementation ({@link java.util.concurrent.LinkedBlockingQueue}). + * As LinkedBlockingQueue is a synchronized class, all operations to add or remove elements inside cannot have collisions issues
*Example:
** for(int i = 0; i < 4; i++) { @@ -40,28 +60,37 @@ public class AsyncTasks { * task 3 * 3 *- * The worker thread is sleeping if it doesn't have task to execute and wake up if necessary when you add a task + * The worker thread is sleeping if it doesn't have task to execute and wake up if necessary when you add a task. + * A worker which crash due to exception in the code of your task will automatically be recreated (see {@link java.util.concurrent.ThreadPoolExecutor} for more informations). * @param function task to be executed * @throws InterruptedException when worker thread or BlockQueue has been interrupted while waiting (which is anormal) */ - public static void addTask(Runnable function) throws InterruptedException { - if(worker.workerThread.isInterrupted()) - throw new InterruptedException("Async task thread has been interrupted while waiting for another task, which is anormal, please report this issue to developers"); - worker.tasks.put(function); - // this condition is non-atomic, but we want to avoid unwanted and useless interruption in the main thread(s) while waiting for the worker thread to be released - if(worker.isWaiting) { - worker.lock.lock(); - worker.lockSignal.signalAll(); - worker.lock.unlock(); + public void addTask(Runnable function) throws InterruptedException { + if(worker.isTerminated() || worker.isShutdown()) { + throw new InterruptedException("Worker has been terminated or shutdown, it's impossible to add new task"); } + worker.submit(function); } /** - * Return the numbers of produced tasks which hasn't been consumed yet - * @return numbers of produced tasks which hasn't been consumed yet + * This method is call when you want to close workers and wait for waiting tasks to finish + * */ - public static int numberOfWaitingTask() { - return worker.tasks.size(); + @Override + public void close() throws Exception { + worker.shutdown(); + boolean result = worker.awaitTermination(10, TimeUnit.SECONDS); + if(!result) { + worker.shutdownNow(); + throw new UnfinishedTasksException("Tasks take too many time to finish, shutdown has been enforce"); + } } + public static class UnfinishedTasksException extends Exception { + + public UnfinishedTasksException(String message) { + super(message); + } + + } } diff --git a/src/main/java/fr/altarik/toolbox/asynctasks/TasksThread.java b/src/main/java/fr/altarik/toolbox/asynctasks/TasksThread.java deleted file mode 100644 index 96e0ebb..0000000 --- a/src/main/java/fr/altarik/toolbox/asynctasks/TasksThread.java +++ /dev/null @@ -1,44 +0,0 @@ -package fr.altarik.toolbox.asynctasks; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Package private class, AsyncTasks is the interface user need to interact with it and users should never directly call this class - */ -class TasksThread { - - boolean isWaiting = false; - final ReentrantLock lock = new ReentrantLock(); - final Condition lockSignal = lock.newCondition(); - Thread workerThread; - - final BlockingQueue