diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/AsyncTaskI.java b/Tasks/src/main/java/fr/altarik/toolbox/task/AsyncTaskI.java new file mode 100644 index 0000000..c5b771d --- /dev/null +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/AsyncTaskI.java @@ -0,0 +1,4 @@ +package fr.altarik.toolbox.task; + +public interface AsyncTaskI extends TaskI, AutoCloseable { +} diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/TaskI.java b/Tasks/src/main/java/fr/altarik/toolbox/task/TaskI.java index 8a45bec..a98753c 100644 --- a/Tasks/src/main/java/fr/altarik/toolbox/task/TaskI.java +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/TaskI.java @@ -1,6 +1,6 @@ package fr.altarik.toolbox.task; -public interface TaskI extends AutoCloseable { +public interface TaskI { public void addTask(AltarikRunnable function) throws InterruptedException; diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/TaskScheduler.java b/Tasks/src/main/java/fr/altarik/toolbox/task/TaskScheduler.java new file mode 100644 index 0000000..f990081 --- /dev/null +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/TaskScheduler.java @@ -0,0 +1,56 @@ +package fr.altarik.toolbox.task; + +import fr.altarik.toolbox.task.syncTasks.SchedulerTaskData; + +import java.util.Vector; + +public class TaskScheduler { + private Vector asyncTasks; + private boolean stop = false; + + + public synchronized void sendAsyncTask(AltarikRunnable function, long delay, long period) throws InterruptedException { + asyncTasks.addElement(new SchedulerTaskData(function, delay, period)); + notify(); + } + + /** + * Function executed in asynchronous workers with periodic tasks + */ + public synchronized void asyncRunnerPeriodicTasks() throws InterruptedException { + loop: while(true) { + notify(); + while(asyncTasks.size() == 0) { + if(isStop()) { + break loop; + } + wait(); + } + SchedulerTaskData data = asyncTasks.firstElement(); + asyncTasks.remove(data); + if(!data.getFunction().isCancelled()) { + long currentDelay = data.getCurrentDelay(); + if(currentDelay != 0) { + data.setCurrentDelay(currentDelay - 1); + asyncTasks.addElement(data); + } else { + data.getFunction().run(); + data.setCurrentDelay(data.getPeriod()); + asyncTasks.addElement(data); + } + } + } + } + + public synchronized boolean isStop() { + return stop; + } + + public synchronized void setStop(boolean stop) { + this.stop = stop; + } + + public int getNumberOfTasks() { + return asyncTasks.size(); + } +} diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/asyncTasks/AsyncPeriodicTasks.java b/Tasks/src/main/java/fr/altarik/toolbox/task/asyncTasks/AsyncPeriodicTasks.java new file mode 100644 index 0000000..de31e44 --- /dev/null +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/asyncTasks/AsyncPeriodicTasks.java @@ -0,0 +1,83 @@ +package fr.altarik.toolbox.task.asyncTasks; + +import fr.altarik.toolbox.task.*; +import it.unimi.dsi.fastutil.ints.IntComparators; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class AsyncPeriodicTasks implements PeriodicTaskI, AsyncTaskI { + + private final ExecutorService worker; + private final List schedulers; + + private AsyncPeriodicTasks(int numberOfWorker) { + int size = 0; + if(numberOfWorker == 1) { + worker = Executors.newSingleThreadExecutor(); + size = 1; + } else if (numberOfWorker <= 0) { + worker = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + size = Runtime.getRuntime().availableProcessors(); + } else { + worker = Executors.newFixedThreadPool(numberOfWorker); + size = numberOfWorker; + } + this.schedulers = new ArrayList<>(size); + for(int i = 0; i < size; i++) { + TaskScheduler scheduler = new TaskScheduler(); + schedulers.add(scheduler); + worker.submit(() -> { + try { + scheduler.asyncRunnerPeriodicTasks(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } + } + + /** + * Call this method at startup or before first use of {@link AsyncTasks#addTask(AltarikRunnable)}, cause without it, nothing will work + * This method declare worker thread and start it, without call it, by calling addTask(Runnable), it'll add your task to Queue, but tasks will never be consumed. + * + * @return an instance of AsyncTasks + */ + public static TaskI initialize(int numberOfWorker) { + return new AsyncPeriodicTasks(numberOfWorker); + } + + public static TaskI initialize() { + return initialize(Runtime.getRuntime().availableProcessors()); + } + + @Override + public void addTask(AltarikRunnable function) throws InterruptedException { + this.addTask(function, 0, 1); + } + + @Override + public void addTask(AltarikRunnable function, long delay, long period) throws InterruptedException { + if(worker.isTerminated() || worker.isShutdown()) { + throw new InterruptedException("Worker has been terminated or shutdown, it's impossible to add new task"); + } + schedulers.stream() + .min((o1, o2) -> IntComparators.NATURAL_COMPARATOR.compare(o1.getNumberOfTasks(), o2.getNumberOfTasks())) + .orElseThrow() + .sendAsyncTask(function, delay, period); + } + + @Override + public void close() throws Exception { + schedulers.forEach(s -> s.setStop(true)); + worker.shutdown(); + boolean result = worker.awaitTermination(10, TimeUnit.SECONDS); + if(!result) { + worker.shutdownNow(); + throw new AsyncTasks.UnfinishedTasksException("Tasks take too many time to finish, shutdown has been enforce"); + } + } +} diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/asyncTasks/AsyncTasks.java b/Tasks/src/main/java/fr/altarik/toolbox/task/asyncTasks/AsyncTasks.java index e99a9e2..50fcc43 100644 --- a/Tasks/src/main/java/fr/altarik/toolbox/task/asyncTasks/AsyncTasks.java +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/asyncTasks/AsyncTasks.java @@ -18,14 +18,14 @@ public class AsyncTasks implements TaskI { if(numberOfWorker == 1) { worker = Executors.newSingleThreadExecutor(); } else if (numberOfWorker <= 0) { - worker = Executors.newCachedThreadPool(); + worker = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); } else { worker = Executors.newFixedThreadPool(numberOfWorker); } } /** - * Call this method at startup or before first use of {@link AsyncTasks#addTask(Runnable)}, cause without it, nothing will work + * Call this method at startup or before first use of {@link AsyncTasks#addTask(AltarikRunnable)}, cause without it, nothing will work * This method declare worker thread and start it, without call it, by calling addTask(Runnable), it'll add your task to Queue, but tasks will never be consumed. * * @return an instance of AsyncTasks diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/syncTasks/PeriodicSyncTask.java b/Tasks/src/main/java/fr/altarik/toolbox/task/syncTasks/PeriodicSyncTask.java index 9da0aa5..0c15f4e 100644 --- a/Tasks/src/main/java/fr/altarik/toolbox/task/syncTasks/PeriodicSyncTask.java +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/syncTasks/PeriodicSyncTask.java @@ -1,12 +1,13 @@ package fr.altarik.toolbox.task.syncTasks; import fr.altarik.toolbox.task.AltarikRunnable; +import fr.altarik.toolbox.task.PeriodicTaskI; import fr.altarik.toolbox.task.TaskI; import java.util.ArrayList; import java.util.List; -public class PeriodicSyncTask implements TaskI, Runnable { +public class PeriodicSyncTask implements PeriodicTaskI, Runnable { private ServerTickListener listener; private List tasks; @@ -26,11 +27,6 @@ public class PeriodicSyncTask implements TaskI, Runnable { tasks.add(function); } - @Override - public void close() throws Exception { - - } - @Override public void run() { List removeList = new ArrayList<>(tasks.size()); @@ -44,4 +40,9 @@ public class PeriodicSyncTask implements TaskI, Runnable { } tasks.removeAll(removeList); } + + @Override + public void addTask(AltarikRunnable function, long delay, long period) throws InterruptedException { + + } } diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/syncTasks/SchedulerTaskData.java b/Tasks/src/main/java/fr/altarik/toolbox/task/syncTasks/SchedulerTaskData.java new file mode 100644 index 0000000..2eb5ee8 --- /dev/null +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/syncTasks/SchedulerTaskData.java @@ -0,0 +1,39 @@ +package fr.altarik.toolbox.task.syncTasks; + +import fr.altarik.toolbox.task.AltarikRunnable; + +public class SchedulerTaskData { + + private final long delay; + private final long period; + private final AltarikRunnable function; + + private long currentDelay; + + public SchedulerTaskData(AltarikRunnable function, long delay, long period) { + this.function = function; + this.delay = delay; + this.period = period; + this.currentDelay = delay; + } + + public AltarikRunnable getFunction() { + return function; + } + + public long getCurrentDelay() { + return currentDelay; + } + + public void setCurrentDelay(long currentDelay) { + this.currentDelay = currentDelay; + } + + public long getDelay() { + return delay; + } + + public long getPeriod() { + return period; + } +}