Compare commits

...

3 Commits

Author SHA1 Message Date
0f902faa1b Fix fabric.mod.json, add run folder to gitignore
Signed-off-by: Quentin Legot <legotquentin@gmail.com>
2023-02-13 15:25:08 +01:00
f5db55e96e Added tests 2023-02-13 15:00:13 +01:00
c32e72dda6 Move each task scheduler to a specific class: Scheduler.java, AsyncPeriodicTasks.java scheduler is now on server thread (send task to async worker)
Signed-off-by: Quentin Legot <legotquentin@gmail.com>
2023-02-13 14:17:03 +01:00
17 changed files with 188 additions and 164 deletions

2
.gitignore vendored
View File

@ -4,6 +4,8 @@ build/
!**/src/main/**/build/ !**/src/main/**/build/
!**/src/test/**/build/ !**/src/test/**/build/
Tasks/run
### IntelliJ IDEA ### ### IntelliJ IDEA ###
.idea .idea
*.iws *.iws

View File

@ -1,5 +1,5 @@
plugins { plugins {
id 'fabric-loom' version '1.0-SNAPSHOT' id 'fabric-loom' version '1.1-SNAPSHOT'
} }

View File

@ -5,7 +5,7 @@ public abstract class AltarikRunnable implements Runnable {
private boolean isCancelled = false; private boolean isCancelled = false;
/** /**
* Warning: Some task cannot be cancelled (mostly async tasks like {@link fr.altarik.toolbox.task.async.AsyncTasks} * Warning: Some task cannot be cancelled (mostly async tasks like {@link fr.altarik.toolbox.task.async.AsyncTasks})
* The result of this call is ignored in this case, you can still add a way to not execute its content (like if(isCancelled) return;) * The result of this call is ignored in this case, you can still add a way to not execute its content (like if(isCancelled) return;)
*/ */
public void cancel() { public void cancel() {

View File

@ -10,5 +10,5 @@ public interface PeriodicTaskI extends TaskI {
* @throws InterruptedException When executed asynchronously, task may be interrupted * @throws InterruptedException When executed asynchronously, task may be interrupted
* @see fr.altarik.toolbox.task.sync.PeriodicSyncTask * @see fr.altarik.toolbox.task.sync.PeriodicSyncTask
*/ */
public void addTask(AltarikRunnable function, long delay, long period) throws InterruptedException; void addTask(AltarikRunnable function, long delay, long period) throws InterruptedException;
} }

View File

@ -0,0 +1,40 @@
package fr.altarik.toolbox.task;
import java.util.ArrayList;
import java.util.List;
public class Scheduler implements Runnable {
private final List<SchedulerTaskData> tasks;
private final SendTaskWorkerI worker;
public Scheduler(SendTaskWorkerI worker, List<SchedulerTaskData> tasks) {
this.worker = worker;
this.tasks = tasks;
}
@Override
public void run() {
List<SchedulerTaskData> removeList = new ArrayList<>(tasks.size());
for(SchedulerTaskData data : tasks) {
if(!data.getFunction().isCancelled()) {
long currentDelay = data.getCurrentDelay();
if(currentDelay != 0) {
data.setCurrentDelay(currentDelay - 1);
} else {
worker.sendTask(data.getFunction());
if(data.getPeriod() == -1) {
removeList.add(data);
} else {
data.setCurrentDelay(data.getPeriod());
}
}
} else {
removeList.add(data);
}
}
for(SchedulerTaskData toRemove : removeList) {
tasks.remove(toRemove);
}
}
}

View File

@ -0,0 +1,11 @@
package fr.altarik.toolbox.task;
public interface SendTaskWorkerI {
/**
* Internal use for scheduler, do not use.
* Scheduler use this method to send the task to execute to worker
* @param task task to execute now
*/
void sendTask(AltarikRunnable task);
}

View File

@ -1,4 +1,4 @@
package fr.altarik.toolbox.task.sync; package fr.altarik.toolbox.task;
import net.fabricmc.fabric.api.event.lifecycle.v1.ServerTickEvents; import net.fabricmc.fabric.api.event.lifecycle.v1.ServerTickEvents;
import net.minecraft.server.MinecraftServer; import net.minecraft.server.MinecraftServer;

View File

@ -1,15 +1,50 @@
package fr.altarik.toolbox.task; package fr.altarik.toolbox.task;
import fr.altarik.toolbox.task.async.AsyncPeriodicTasks;
import fr.altarik.toolbox.task.async.AsyncTaskI;
import fr.altarik.toolbox.task.async.AsyncTasks; import fr.altarik.toolbox.task.async.AsyncTasks;
import fr.altarik.toolbox.task.sync.PeriodicSyncTask;
import net.fabricmc.api.ModInitializer; import net.fabricmc.api.ModInitializer;
public class Task implements ModInitializer { public class Task implements ModInitializer {
public TaskI asyncWorkers = AsyncTasks.initialize(); public final TaskI asyncWorkers = AsyncTasks.initialize();
public final PeriodicTaskI periodicSyncTask = PeriodicSyncTask.initialize();
public final AsyncTaskI asyncTasks = AsyncTasks.initialize();
public final PeriodicTaskI periodicAsyncTask = AsyncPeriodicTasks.initialize();
@Override @Override
public void onInitialize() { public void onInitialize() {
/* try {
asyncWorkers.addTask(new AltarikRunnable() {
@Override
public void run() {
System.out.println("Hello world 1");
}
});
periodicSyncTask.addTask(new AltarikRunnable() {
@Override
public void run() {
System.out.println("Hello world 2");
}
}, 40, 60);
asyncTasks.addTask(new AltarikRunnable() {
@Override
public void run() {
System.out.println("Hello world 3 : " + Thread.currentThread().getName());
}
});
periodicAsyncTask.addTask(new AltarikRunnable() {
@Override
public void run() {
System.out.println("Hello world 4 : " + Thread.currentThread().getName());
}
}, 60, 80);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} */
} }
public TaskI getAsyncWorkers() { public TaskI getAsyncWorkers() {

View File

@ -2,6 +2,11 @@ package fr.altarik.toolbox.task;
public interface TaskI { public interface TaskI {
/**
* Send task to worker, execution depends on implementation
* @param function task you send to worker
* @throws InterruptedException used by asynchronous workers if threads has been interrupted or shutdown
*/
void addTask(AltarikRunnable function) throws InterruptedException; void addTask(AltarikRunnable function) throws InterruptedException;
} }

View File

@ -1,44 +1,34 @@
package fr.altarik.toolbox.task.async; package fr.altarik.toolbox.task.async;
import fr.altarik.toolbox.task.AltarikRunnable; import fr.altarik.toolbox.task.*;
import fr.altarik.toolbox.task.PeriodicTaskI;
import it.unimi.dsi.fastutil.ints.IntComparators;
import java.util.ArrayList; import java.util.Stack;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class AsyncPeriodicTasks implements PeriodicTaskI, AsyncTaskI { /**
* A task manager to execute periodic tasks asynchronously. A scheduler on the main/server thread will send the task to
* worker threads.
*/
public class AsyncPeriodicTasks implements PeriodicTaskI, AsyncTaskI, SendTaskWorkerI {
private final ExecutorService worker; private final ExecutorService worker;
private final List<TaskScheduler> schedulers; private final Stack<SchedulerTaskData> tasks;
protected final Scheduler scheduler;
private final ServerTickListener listener;
private AsyncPeriodicTasks(int numberOfWorker) { private AsyncPeriodicTasks(int numberOfWorker) {
int size = 0;
if(numberOfWorker == 1) { if(numberOfWorker == 1) {
worker = Executors.newSingleThreadExecutor(); worker = Executors.newSingleThreadExecutor();
size = 1;
} else if (numberOfWorker <= 0) { } else if (numberOfWorker <= 0) {
worker = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); worker = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
size = Runtime.getRuntime().availableProcessors();
} else { } else {
worker = Executors.newFixedThreadPool(numberOfWorker); worker = Executors.newFixedThreadPool(numberOfWorker);
size = numberOfWorker;
}
this.schedulers = new ArrayList<>(size);
for(int i = 0; i < size; i++) {
TaskScheduler scheduler = new TaskScheduler();
schedulers.add(scheduler);
worker.submit(() -> {
try {
scheduler.asyncRunnerPeriodicTasks();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
} }
tasks = new Stack<>();
this.scheduler = new Scheduler(this, tasks);
this.listener = new ServerTickListener(scheduler);
} }
/** /**
@ -55,25 +45,37 @@ public class AsyncPeriodicTasks implements PeriodicTaskI, AsyncTaskI {
return initialize(Runtime.getRuntime().availableProcessors()); return initialize(Runtime.getRuntime().availableProcessors());
} }
/**
* Send the task to the scheduler, the task is executed at the next server tick and at every following tick
* @param function the function which will be executed
* @throws InterruptedException if worker has terminated or is shutting down
*/
@Override @Override
public void addTask(AltarikRunnable function) throws InterruptedException { public void addTask(AltarikRunnable function) throws InterruptedException {
this.addTask(function, 0, 1000); this.addTask(function, 0, 1);
} }
/**
* Send the task to the scheduler, executed depending on the parameters (delay and period)
* @param function the function to execute
* @param delay delay in tick before starting the task
* @param period time in tick to wait between runs
* @throws InterruptedException if worker has terminated or is shutting down
*/
@Override @Override
public void addTask(AltarikRunnable function, long delay, long period) throws InterruptedException { public void addTask(AltarikRunnable function, long delay, long period) throws InterruptedException {
if(worker.isTerminated() || worker.isShutdown()) { if(worker.isTerminated() || worker.isShutdown()) {
throw new InterruptedException("Worker has been terminated or shutdown, it's impossible to add new task"); throw new InterruptedException("Worker has been terminated or shutdown, it's impossible to add new task");
} }
schedulers.stream() tasks.add(new SchedulerTaskData(function, delay, period - 1));
.min((o1, o2) -> IntComparators.NATURAL_COMPARATOR.compare(o1.getNumberOfTasks(), o2.getNumberOfTasks()))
.orElseThrow()
.sendAsyncTask(function, delay, period);
} }
/**
* Try to execute task you already send in 10 seconds, otherwise workers are killed.
* @throws AsyncTasks.UnfinishedTasksException if workers has been shutdown before finishing every tasks
*/
@Override @Override
public void close() throws Exception { public void close() throws Exception {
schedulers.forEach(s -> s.setStop(true));
worker.shutdown(); worker.shutdown();
boolean result = worker.awaitTermination(10, TimeUnit.SECONDS); boolean result = worker.awaitTermination(10, TimeUnit.SECONDS);
if(!result) { if(!result) {
@ -81,4 +83,9 @@ public class AsyncPeriodicTasks implements PeriodicTaskI, AsyncTaskI {
throw new AsyncTasks.UnfinishedTasksException("Tasks take too many time to finish, shutdown has been enforce"); throw new AsyncTasks.UnfinishedTasksException("Tasks take too many time to finish, shutdown has been enforce");
} }
} }
@Override
public void sendTask(AltarikRunnable task) {
worker.submit(task);
}
} }

View File

@ -1,67 +0,0 @@
package fr.altarik.toolbox.task.async;
import fr.altarik.toolbox.task.AltarikRunnable;
import fr.altarik.toolbox.task.SchedulerTaskData;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Vector;
public class TaskScheduler {
private Vector<SchedulerTaskData> asyncTasks;
/**
* Return last time the method was executed, Value initialized to now when using sending the task to scheduler
*/
private HashMap<SchedulerTaskData, Instant> lastTimeExecution;
private boolean stop = false;
public synchronized void sendAsyncTask(AltarikRunnable function, long delay, long period) throws InterruptedException {
SchedulerTaskData data = new SchedulerTaskData(function, delay, period);
asyncTasks.addElement(data);
lastTimeExecution.put(new SchedulerTaskData(function, delay, period), Instant.now());
notify();
}
/**
* Function executed in asynchronous workers with periodic tasks
*/
public synchronized void asyncRunnerPeriodicTasks() throws InterruptedException {
loop: while(true) {
while(asyncTasks.size() == 0) {
if(isStop()) {
break loop;
}
wait();
}
SchedulerTaskData data = asyncTasks.firstElement();
asyncTasks.remove(data);
if(!data.getFunction().isCancelled()) {
long currentDelay = data.getCurrentDelay();
Instant currentTime = Instant.now();
Instant lastExecution = lastTimeExecution.get(data);
// (lastExec + delay) - currentTime
if(lastExecution.plus(currentDelay, ChronoUnit.MILLIS).isBefore(currentTime)) {
data.getFunction().run();
data.setCurrentDelay(data.getPeriod());
lastTimeExecution.put(data, Instant.now());
asyncTasks.addElement(data);
}
}
}
}
public synchronized boolean isStop() {
return stop;
}
public synchronized void setStop(boolean stop) {
this.stop = stop;
}
public int getNumberOfTasks() {
return asyncTasks.size();
}
}

View File

@ -1,20 +1,20 @@
package fr.altarik.toolbox.task.sync; package fr.altarik.toolbox.task.sync;
import fr.altarik.toolbox.task.AltarikRunnable; import fr.altarik.toolbox.task.*;
import fr.altarik.toolbox.task.PeriodicTaskI;
import fr.altarik.toolbox.task.SchedulerTaskData;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
public class PeriodicSyncTask implements PeriodicTaskI, Runnable { public class PeriodicSyncTask implements PeriodicTaskI, SendTaskWorkerI {
private final ServerTickListener listener; private final ServerTickListener listener;
private final List<SchedulerTaskData> tasks; private final List<SchedulerTaskData> tasks;
protected final Scheduler scheduler;
private PeriodicSyncTask() { private PeriodicSyncTask() {
this.listener = new ServerTickListener(this);
this.tasks = new ArrayList<>(2); this.tasks = new ArrayList<>(2);
this.scheduler = new Scheduler(this, tasks);
this.listener = new ServerTickListener(scheduler);
} }
@ -27,30 +27,13 @@ public class PeriodicSyncTask implements PeriodicTaskI, Runnable {
addTask(function, 0, 1); addTask(function, 0, 1);
} }
@Override
public void run() {
List<SchedulerTaskData> removeList = new ArrayList<>(tasks.size());
for(SchedulerTaskData data : tasks) {
if(!data.getFunction().isCancelled()) {
long currentDelay = data.getCurrentDelay();
if(currentDelay != 0) {
data.setCurrentDelay(currentDelay - 1);
} else {
data.getFunction().run();
data.setCurrentDelay(data.getPeriod());
}
} else {
removeList.add(data);
}
}
for(SchedulerTaskData toRemove : removeList) {
tasks.remove(toRemove);
}
}
@Override @Override
public void addTask(AltarikRunnable function, long delay, long period) { public void addTask(AltarikRunnable function, long delay, long period) {
tasks.add(new SchedulerTaskData(function, delay, period - 1)); tasks.add(new SchedulerTaskData(function, delay, period - 1));
} }
@Override
public void sendTask(AltarikRunnable task) {
task.run();
}
} }

View File

@ -1,20 +1,20 @@
package fr.altarik.toolbox.task.sync; package fr.altarik.toolbox.task.sync;
import fr.altarik.toolbox.task.AltarikRunnable; import fr.altarik.toolbox.task.*;
import fr.altarik.toolbox.task.SchedulerTaskData;
import fr.altarik.toolbox.task.TaskI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
public class SyncTask implements TaskI, Runnable { public class SyncTask implements TaskI, SendTaskWorkerI {
private final ServerTickListener listener; private final ServerTickListener listener;
private final List<SchedulerTaskData> tasks; private final List<SchedulerTaskData> tasks;
protected final Scheduler scheduler;
private SyncTask() { private SyncTask() {
this.listener = new ServerTickListener(this);
this.tasks = new ArrayList<>(2); this.tasks = new ArrayList<>(2);
this.scheduler = new Scheduler(this, tasks);
this.listener = new ServerTickListener(scheduler);
} }
public static TaskI initialize() { public static TaskI initialize() {
@ -27,27 +27,11 @@ public class SyncTask implements TaskI, Runnable {
} }
public void addTask(AltarikRunnable function, int delay) { public void addTask(AltarikRunnable function, int delay) {
tasks.add(new SchedulerTaskData(function, delay, 0)); tasks.add(new SchedulerTaskData(function, delay, -1));
} }
@Override @Override
public void run() { public void sendTask(AltarikRunnable task) {
List<SchedulerTaskData> removeList = new ArrayList<>(tasks.size()); task.run();
for(SchedulerTaskData data : tasks) {
if(!data.getFunction().isCancelled()) {
long currentDelay = data.getCurrentDelay();
if(currentDelay != 0) {
data.setCurrentDelay(currentDelay - 1);
} else {
data.getFunction().run();
removeList.add(data);
}
} else {
removeList.add(data);
}
}
for(SchedulerTaskData toRemove : removeList) {
tasks.remove(toRemove);
}
} }
} }

View File

@ -27,8 +27,7 @@
"depends": { "depends": {
"fabricloader": ">=0.14.12", "fabricloader": ">=0.14.12",
"fabric": "*", "fabric": "*",
"minecraft": "1.19.3", "minecraft": "1.19.3"
"npcs": "2.0.1-SNAPSHOT"
} }
} }

View File

@ -1,6 +1,7 @@
package fr.altarik.toolbox.task.async; package fr.altarik.toolbox.task.async;
import fr.altarik.toolbox.task.AltarikRunnable; import fr.altarik.toolbox.task.AltarikRunnable;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Stack; import java.util.Stack;
@ -9,24 +10,48 @@ import java.util.concurrent.atomic.AtomicInteger;
public class PeriodicAsyncTaskTest { public class PeriodicAsyncTaskTest {
@Test @Test
void testPeriodicASyncTask() { void testPeriodicASyncTask() throws Exception {
AsyncPeriodicTasks worker = (AsyncPeriodicTasks) AsyncPeriodicTasks.initialize(); AsyncPeriodicTasks worker = (AsyncPeriodicTasks) AsyncPeriodicTasks.initialize(1);
Stack<AtomicInteger> results = new Stack<>(); Stack<AtomicInteger> results = new Stack<>();
AtomicInteger value1 = new AtomicInteger(1); AtomicInteger value1 = new AtomicInteger(1);
AtomicInteger value2 = new AtomicInteger(2); AtomicInteger value2 = new AtomicInteger(2);
AltarikRunnable runnable1 = new AltarikRunnable() { AltarikRunnable runnable1 = new AltarikRunnable() {
private int i = 0;
@Override @Override
public void run() { public void run() {
results.add(value1); results.add(value1);
i++;
if(i == 2)
cancel();
} }
}; };
AltarikRunnable runnable2 = new AltarikRunnable() { AltarikRunnable runnable2 = new AltarikRunnable() {
private int i = 0;
@Override @Override
public void run() { public void run() {
results.add(value2); results.add(value2);
i++;
if(i == 4)
cancel();
} }
}; };
worker.addTask(runnable1, 1, 4);
worker.addTask(runnable2, 0, 2);
for(int i = 0; i < 10; i++) {
worker.scheduler.run();
}
AtomicInteger[] expected = {
value2,
value1,
value2,
value2,
value1,
value2,
value2,
value1
};
worker.close();
Assertions.assertArrayEquals(expected, results.toArray());
} }
} }

View File

@ -33,7 +33,7 @@ class PeriodicSyncTaskTest {
} }
}); });
for(int i = 0; i < 10; i++) { for(int i = 0; i < 10; i++) {
worker.run(); worker.scheduler.run();
} }
AtomicInteger[] expected = { AtomicInteger[] expected = {
value2, value2,

View File

@ -29,14 +29,14 @@ class SyncTaskTest {
} }
}; };
worker.addTask(task1); worker.addTask(task1);
worker.run(); worker.scheduler.run();
worker.run(); worker.scheduler.run();
worker.addTask(task2); worker.addTask(task2);
worker.addTask(task1); worker.addTask(task1);
worker.addTask(task2); worker.addTask(task2);
worker.run(); worker.scheduler.run();
worker.addTask(task1); worker.addTask(task1);
worker.run(); worker.scheduler.run();
AtomicInteger[] expected = { AtomicInteger[] expected = {
value1, value1,
value2, value2,