From c32e72dda643252414f5d025e888b9acee037d34 Mon Sep 17 00:00:00 2001 From: Quentin Legot Date: Mon, 13 Feb 2023 14:17:03 +0100 Subject: [PATCH] Move each task scheduler to a specific class: Scheduler.java, AsyncPeriodicTasks.java scheduler is now on server thread (send task to async worker) Signed-off-by: Quentin Legot --- Tasks/build.gradle | 2 +- .../altarik/toolbox/task/AltarikRunnable.java | 2 +- .../fr/altarik/toolbox/task/Scheduler.java | 40 +++++++++++ .../altarik/toolbox/task/SendTaskWorkerI.java | 6 ++ .../task/{sync => }/ServerTickListener.java | 2 +- .../task/async/AsyncPeriodicTasks.java | 43 +++++------- .../toolbox/task/async/TaskScheduler.java | 67 ------------------- .../toolbox/task/sync/PeriodicSyncTask.java | 37 +++------- .../altarik/toolbox/task/sync/SyncTask.java | 32 +++------ .../task/async/PeriodicAsyncTaskTest.java | 31 ++++++++- .../task/sync/PeriodicSyncTaskTest.java | 2 +- .../toolbox/task/sync/SyncTaskTest.java | 8 +-- 12 files changed, 115 insertions(+), 157 deletions(-) create mode 100644 Tasks/src/main/java/fr/altarik/toolbox/task/Scheduler.java create mode 100644 Tasks/src/main/java/fr/altarik/toolbox/task/SendTaskWorkerI.java rename Tasks/src/main/java/fr/altarik/toolbox/task/{sync => }/ServerTickListener.java (92%) delete mode 100644 Tasks/src/main/java/fr/altarik/toolbox/task/async/TaskScheduler.java diff --git a/Tasks/build.gradle b/Tasks/build.gradle index adcbbf1..590944e 100644 --- a/Tasks/build.gradle +++ b/Tasks/build.gradle @@ -1,5 +1,5 @@ plugins { - id 'fabric-loom' version '1.0-SNAPSHOT' + id 'fabric-loom' version '1.1-SNAPSHOT' } diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/AltarikRunnable.java b/Tasks/src/main/java/fr/altarik/toolbox/task/AltarikRunnable.java index bdc3470..941c6ff 100644 --- a/Tasks/src/main/java/fr/altarik/toolbox/task/AltarikRunnable.java +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/AltarikRunnable.java @@ -5,7 +5,7 @@ public abstract class AltarikRunnable implements Runnable { private boolean isCancelled = false; /** - * Warning: Some task cannot be cancelled (mostly async tasks like {@link fr.altarik.toolbox.task.async.AsyncTasks} + * Warning: Some task cannot be cancelled (mostly async tasks like {@link fr.altarik.toolbox.task.async.AsyncTasks}) * The result of this call is ignored in this case, you can still add a way to not execute its content (like if(isCancelled) return;) */ public void cancel() { diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/Scheduler.java b/Tasks/src/main/java/fr/altarik/toolbox/task/Scheduler.java new file mode 100644 index 0000000..c3b5d99 --- /dev/null +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/Scheduler.java @@ -0,0 +1,40 @@ +package fr.altarik.toolbox.task; + +import java.util.ArrayList; +import java.util.List; + +public class Scheduler implements Runnable { + + private final List tasks; + private final SendTaskWorkerI worker; + + public Scheduler(SendTaskWorkerI worker, List tasks) { + this.worker = worker; + this.tasks = tasks; + } + + @Override + public void run() { + List removeList = new ArrayList<>(tasks.size()); + for(SchedulerTaskData data : tasks) { + if(!data.getFunction().isCancelled()) { + long currentDelay = data.getCurrentDelay(); + if(currentDelay != 0) { + data.setCurrentDelay(currentDelay - 1); + } else { + worker.sendTask(data.getFunction()); + if(data.getPeriod() == -1) { + removeList.add(data); + } else { + data.setCurrentDelay(data.getPeriod()); + } + } + } else { + removeList.add(data); + } + } + for(SchedulerTaskData toRemove : removeList) { + tasks.remove(toRemove); + } + } +} diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/SendTaskWorkerI.java b/Tasks/src/main/java/fr/altarik/toolbox/task/SendTaskWorkerI.java new file mode 100644 index 0000000..01f354d --- /dev/null +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/SendTaskWorkerI.java @@ -0,0 +1,6 @@ +package fr.altarik.toolbox.task; + +public interface SendTaskWorkerI { + + void sendTask(AltarikRunnable task); +} diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/sync/ServerTickListener.java b/Tasks/src/main/java/fr/altarik/toolbox/task/ServerTickListener.java similarity index 92% rename from Tasks/src/main/java/fr/altarik/toolbox/task/sync/ServerTickListener.java rename to Tasks/src/main/java/fr/altarik/toolbox/task/ServerTickListener.java index d70562d..cac9623 100644 --- a/Tasks/src/main/java/fr/altarik/toolbox/task/sync/ServerTickListener.java +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/ServerTickListener.java @@ -1,4 +1,4 @@ -package fr.altarik.toolbox.task.sync; +package fr.altarik.toolbox.task; import net.fabricmc.fabric.api.event.lifecycle.v1.ServerTickEvents; import net.minecraft.server.MinecraftServer; diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/async/AsyncPeriodicTasks.java b/Tasks/src/main/java/fr/altarik/toolbox/task/async/AsyncPeriodicTasks.java index 1141751..1e095f8 100644 --- a/Tasks/src/main/java/fr/altarik/toolbox/task/async/AsyncPeriodicTasks.java +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/async/AsyncPeriodicTasks.java @@ -1,44 +1,30 @@ package fr.altarik.toolbox.task.async; -import fr.altarik.toolbox.task.AltarikRunnable; -import fr.altarik.toolbox.task.PeriodicTaskI; -import it.unimi.dsi.fastutil.ints.IntComparators; +import fr.altarik.toolbox.task.*; -import java.util.ArrayList; -import java.util.List; +import java.util.Stack; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -public class AsyncPeriodicTasks implements PeriodicTaskI, AsyncTaskI { +public class AsyncPeriodicTasks implements PeriodicTaskI, AsyncTaskI, SendTaskWorkerI { private final ExecutorService worker; - private final List schedulers; + private final Stack tasks; + protected final Scheduler scheduler; + private final ServerTickListener listener; private AsyncPeriodicTasks(int numberOfWorker) { - int size = 0; if(numberOfWorker == 1) { worker = Executors.newSingleThreadExecutor(); - size = 1; } else if (numberOfWorker <= 0) { worker = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - size = Runtime.getRuntime().availableProcessors(); } else { worker = Executors.newFixedThreadPool(numberOfWorker); - size = numberOfWorker; - } - this.schedulers = new ArrayList<>(size); - for(int i = 0; i < size; i++) { - TaskScheduler scheduler = new TaskScheduler(); - schedulers.add(scheduler); - worker.submit(() -> { - try { - scheduler.asyncRunnerPeriodicTasks(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); } + tasks = new Stack<>(); + this.scheduler = new Scheduler(this, tasks); + this.listener = new ServerTickListener(scheduler); } /** @@ -65,15 +51,11 @@ public class AsyncPeriodicTasks implements PeriodicTaskI, AsyncTaskI { if(worker.isTerminated() || worker.isShutdown()) { throw new InterruptedException("Worker has been terminated or shutdown, it's impossible to add new task"); } - schedulers.stream() - .min((o1, o2) -> IntComparators.NATURAL_COMPARATOR.compare(o1.getNumberOfTasks(), o2.getNumberOfTasks())) - .orElseThrow() - .sendAsyncTask(function, delay, period); + tasks.add(new SchedulerTaskData(function, delay, period - 1)); } @Override public void close() throws Exception { - schedulers.forEach(s -> s.setStop(true)); worker.shutdown(); boolean result = worker.awaitTermination(10, TimeUnit.SECONDS); if(!result) { @@ -81,4 +63,9 @@ public class AsyncPeriodicTasks implements PeriodicTaskI, AsyncTaskI { throw new AsyncTasks.UnfinishedTasksException("Tasks take too many time to finish, shutdown has been enforce"); } } + + @Override + public void sendTask(AltarikRunnable task) { + worker.submit(task); + } } diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/async/TaskScheduler.java b/Tasks/src/main/java/fr/altarik/toolbox/task/async/TaskScheduler.java deleted file mode 100644 index 4bc2e74..0000000 --- a/Tasks/src/main/java/fr/altarik/toolbox/task/async/TaskScheduler.java +++ /dev/null @@ -1,67 +0,0 @@ -package fr.altarik.toolbox.task.async; - -import fr.altarik.toolbox.task.AltarikRunnable; -import fr.altarik.toolbox.task.SchedulerTaskData; - -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.HashMap; -import java.util.Vector; - -public class TaskScheduler { - private Vector asyncTasks; - - /** - * Return last time the method was executed, Value initialized to now when using sending the task to scheduler - */ - private HashMap lastTimeExecution; - private boolean stop = false; - - - public synchronized void sendAsyncTask(AltarikRunnable function, long delay, long period) throws InterruptedException { - SchedulerTaskData data = new SchedulerTaskData(function, delay, period); - asyncTasks.addElement(data); - lastTimeExecution.put(new SchedulerTaskData(function, delay, period), Instant.now()); - notify(); - } - - /** - * Function executed in asynchronous workers with periodic tasks - */ - public synchronized void asyncRunnerPeriodicTasks() throws InterruptedException { - loop: while(true) { - while(asyncTasks.size() == 0) { - if(isStop()) { - break loop; - } - wait(); - } - SchedulerTaskData data = asyncTasks.firstElement(); - asyncTasks.remove(data); - if(!data.getFunction().isCancelled()) { - long currentDelay = data.getCurrentDelay(); - Instant currentTime = Instant.now(); - Instant lastExecution = lastTimeExecution.get(data); - // (lastExec + delay) - currentTime - if(lastExecution.plus(currentDelay, ChronoUnit.MILLIS).isBefore(currentTime)) { - data.getFunction().run(); - data.setCurrentDelay(data.getPeriod()); - lastTimeExecution.put(data, Instant.now()); - asyncTasks.addElement(data); - } - } - } - } - - public synchronized boolean isStop() { - return stop; - } - - public synchronized void setStop(boolean stop) { - this.stop = stop; - } - - public int getNumberOfTasks() { - return asyncTasks.size(); - } -} diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/sync/PeriodicSyncTask.java b/Tasks/src/main/java/fr/altarik/toolbox/task/sync/PeriodicSyncTask.java index 381c263..e0c5e1f 100644 --- a/Tasks/src/main/java/fr/altarik/toolbox/task/sync/PeriodicSyncTask.java +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/sync/PeriodicSyncTask.java @@ -1,20 +1,20 @@ package fr.altarik.toolbox.task.sync; -import fr.altarik.toolbox.task.AltarikRunnable; -import fr.altarik.toolbox.task.PeriodicTaskI; -import fr.altarik.toolbox.task.SchedulerTaskData; +import fr.altarik.toolbox.task.*; import java.util.ArrayList; import java.util.List; -public class PeriodicSyncTask implements PeriodicTaskI, Runnable { +public class PeriodicSyncTask implements PeriodicTaskI, SendTaskWorkerI { private final ServerTickListener listener; private final List tasks; + protected final Scheduler scheduler; private PeriodicSyncTask() { - this.listener = new ServerTickListener(this); this.tasks = new ArrayList<>(2); + this.scheduler = new Scheduler(this, tasks); + this.listener = new ServerTickListener(scheduler); } @@ -27,30 +27,13 @@ public class PeriodicSyncTask implements PeriodicTaskI, Runnable { addTask(function, 0, 1); } - @Override - public void run() { - List removeList = new ArrayList<>(tasks.size()); - for(SchedulerTaskData data : tasks) { - if(!data.getFunction().isCancelled()) { - long currentDelay = data.getCurrentDelay(); - if(currentDelay != 0) { - data.setCurrentDelay(currentDelay - 1); - } else { - data.getFunction().run(); - data.setCurrentDelay(data.getPeriod()); - } - } else { - removeList.add(data); - } - } - for(SchedulerTaskData toRemove : removeList) { - tasks.remove(toRemove); - } - - } - @Override public void addTask(AltarikRunnable function, long delay, long period) { tasks.add(new SchedulerTaskData(function, delay, period - 1)); } + + @Override + public void sendTask(AltarikRunnable task) { + task.run(); + } } diff --git a/Tasks/src/main/java/fr/altarik/toolbox/task/sync/SyncTask.java b/Tasks/src/main/java/fr/altarik/toolbox/task/sync/SyncTask.java index 749fe0c..19214e2 100644 --- a/Tasks/src/main/java/fr/altarik/toolbox/task/sync/SyncTask.java +++ b/Tasks/src/main/java/fr/altarik/toolbox/task/sync/SyncTask.java @@ -1,20 +1,20 @@ package fr.altarik.toolbox.task.sync; -import fr.altarik.toolbox.task.AltarikRunnable; -import fr.altarik.toolbox.task.SchedulerTaskData; -import fr.altarik.toolbox.task.TaskI; +import fr.altarik.toolbox.task.*; import java.util.ArrayList; import java.util.List; -public class SyncTask implements TaskI, Runnable { +public class SyncTask implements TaskI, SendTaskWorkerI { private final ServerTickListener listener; private final List tasks; + protected final Scheduler scheduler; private SyncTask() { - this.listener = new ServerTickListener(this); this.tasks = new ArrayList<>(2); + this.scheduler = new Scheduler(this, tasks); + this.listener = new ServerTickListener(scheduler); } public static TaskI initialize() { @@ -27,27 +27,11 @@ public class SyncTask implements TaskI, Runnable { } public void addTask(AltarikRunnable function, int delay) { - tasks.add(new SchedulerTaskData(function, delay, 0)); + tasks.add(new SchedulerTaskData(function, delay, -1)); } @Override - public void run() { - List removeList = new ArrayList<>(tasks.size()); - for(SchedulerTaskData data : tasks) { - if(!data.getFunction().isCancelled()) { - long currentDelay = data.getCurrentDelay(); - if(currentDelay != 0) { - data.setCurrentDelay(currentDelay - 1); - } else { - data.getFunction().run(); - removeList.add(data); - } - } else { - removeList.add(data); - } - } - for(SchedulerTaskData toRemove : removeList) { - tasks.remove(toRemove); - } + public void sendTask(AltarikRunnable task) { + task.run(); } } diff --git a/Tasks/src/test/java/fr/altarik/toolbox/task/async/PeriodicAsyncTaskTest.java b/Tasks/src/test/java/fr/altarik/toolbox/task/async/PeriodicAsyncTaskTest.java index 29e0fe6..9797c2b 100644 --- a/Tasks/src/test/java/fr/altarik/toolbox/task/async/PeriodicAsyncTaskTest.java +++ b/Tasks/src/test/java/fr/altarik/toolbox/task/async/PeriodicAsyncTaskTest.java @@ -1,6 +1,7 @@ package fr.altarik.toolbox.task.async; import fr.altarik.toolbox.task.AltarikRunnable; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.Stack; @@ -9,24 +10,48 @@ import java.util.concurrent.atomic.AtomicInteger; public class PeriodicAsyncTaskTest { @Test - void testPeriodicASyncTask() { - AsyncPeriodicTasks worker = (AsyncPeriodicTasks) AsyncPeriodicTasks.initialize(); + void testPeriodicASyncTask() throws Exception { + AsyncPeriodicTasks worker = (AsyncPeriodicTasks) AsyncPeriodicTasks.initialize(1); Stack results = new Stack<>(); AtomicInteger value1 = new AtomicInteger(1); AtomicInteger value2 = new AtomicInteger(2); AltarikRunnable runnable1 = new AltarikRunnable() { + private int i = 0; @Override public void run() { results.add(value1); + i++; + if(i == 2) + cancel(); } }; AltarikRunnable runnable2 = new AltarikRunnable() { + private int i = 0; @Override public void run() { results.add(value2); + i++; + if(i == 4) + cancel(); } }; - + worker.addTask(runnable1, 1, 4); + worker.addTask(runnable2, 0, 2); + for(int i = 0; i < 10; i++) { + worker.scheduler.run(); + } + AtomicInteger[] expected = { + value2, + value1, + value2, + value2, + value1, + value2, + value2, + value1 + }; + worker.close(); + Assertions.assertArrayEquals(expected, results.toArray()); } } diff --git a/Tasks/src/test/java/fr/altarik/toolbox/task/sync/PeriodicSyncTaskTest.java b/Tasks/src/test/java/fr/altarik/toolbox/task/sync/PeriodicSyncTaskTest.java index 2a5565c..c252375 100644 --- a/Tasks/src/test/java/fr/altarik/toolbox/task/sync/PeriodicSyncTaskTest.java +++ b/Tasks/src/test/java/fr/altarik/toolbox/task/sync/PeriodicSyncTaskTest.java @@ -33,7 +33,7 @@ class PeriodicSyncTaskTest { } }); for(int i = 0; i < 10; i++) { - worker.run(); + worker.scheduler.run(); } AtomicInteger[] expected = { value2, diff --git a/Tasks/src/test/java/fr/altarik/toolbox/task/sync/SyncTaskTest.java b/Tasks/src/test/java/fr/altarik/toolbox/task/sync/SyncTaskTest.java index b700843..44f5b9f 100644 --- a/Tasks/src/test/java/fr/altarik/toolbox/task/sync/SyncTaskTest.java +++ b/Tasks/src/test/java/fr/altarik/toolbox/task/sync/SyncTaskTest.java @@ -29,14 +29,14 @@ class SyncTaskTest { } }; worker.addTask(task1); - worker.run(); - worker.run(); + worker.scheduler.run(); + worker.scheduler.run(); worker.addTask(task2); worker.addTask(task1); worker.addTask(task2); - worker.run(); + worker.scheduler.run(); worker.addTask(task1); - worker.run(); + worker.scheduler.run(); AtomicInteger[] expected = { value1, value2,