Added AsyncPeriodicTasks, improved a bit PeriodicSyncTask (still a bit of work to do)

Signed-off-by: Quentin Legot <legotquentin@gmail.com>
This commit is contained in:
Quentin Legot 2023-02-03 17:51:16 +01:00
parent 863952b120
commit b1ee9344b8
7 changed files with 192 additions and 9 deletions

View File

@ -0,0 +1,4 @@
package fr.altarik.toolbox.task;
public interface AsyncTaskI extends TaskI, AutoCloseable {
}

View File

@ -1,6 +1,6 @@
package fr.altarik.toolbox.task;
public interface TaskI extends AutoCloseable {
public interface TaskI {
public void addTask(AltarikRunnable function) throws InterruptedException;

View File

@ -0,0 +1,56 @@
package fr.altarik.toolbox.task;
import fr.altarik.toolbox.task.syncTasks.SchedulerTaskData;
import java.util.Vector;
public class TaskScheduler {
private Vector<SchedulerTaskData> asyncTasks;
private boolean stop = false;
public synchronized void sendAsyncTask(AltarikRunnable function, long delay, long period) throws InterruptedException {
asyncTasks.addElement(new SchedulerTaskData(function, delay, period));
notify();
}
/**
* Function executed in asynchronous workers with periodic tasks
*/
public synchronized void asyncRunnerPeriodicTasks() throws InterruptedException {
loop: while(true) {
notify();
while(asyncTasks.size() == 0) {
if(isStop()) {
break loop;
}
wait();
}
SchedulerTaskData data = asyncTasks.firstElement();
asyncTasks.remove(data);
if(!data.getFunction().isCancelled()) {
long currentDelay = data.getCurrentDelay();
if(currentDelay != 0) {
data.setCurrentDelay(currentDelay - 1);
asyncTasks.addElement(data);
} else {
data.getFunction().run();
data.setCurrentDelay(data.getPeriod());
asyncTasks.addElement(data);
}
}
}
}
public synchronized boolean isStop() {
return stop;
}
public synchronized void setStop(boolean stop) {
this.stop = stop;
}
public int getNumberOfTasks() {
return asyncTasks.size();
}
}

View File

@ -0,0 +1,83 @@
package fr.altarik.toolbox.task.asyncTasks;
import fr.altarik.toolbox.task.*;
import it.unimi.dsi.fastutil.ints.IntComparators;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class AsyncPeriodicTasks implements PeriodicTaskI, AsyncTaskI {
private final ExecutorService worker;
private final List<TaskScheduler> schedulers;
private AsyncPeriodicTasks(int numberOfWorker) {
int size = 0;
if(numberOfWorker == 1) {
worker = Executors.newSingleThreadExecutor();
size = 1;
} else if (numberOfWorker <= 0) {
worker = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
size = Runtime.getRuntime().availableProcessors();
} else {
worker = Executors.newFixedThreadPool(numberOfWorker);
size = numberOfWorker;
}
this.schedulers = new ArrayList<>(size);
for(int i = 0; i < size; i++) {
TaskScheduler scheduler = new TaskScheduler();
schedulers.add(scheduler);
worker.submit(() -> {
try {
scheduler.asyncRunnerPeriodicTasks();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
/**
* Call this method at startup or before first use of {@link AsyncTasks#addTask(AltarikRunnable)}, cause without it, nothing will work
* This method declare worker thread and start it, without call it, by calling addTask(Runnable), it'll add your task to Queue, but tasks will never be consumed.
*
* @return an instance of AsyncTasks
*/
public static TaskI initialize(int numberOfWorker) {
return new AsyncPeriodicTasks(numberOfWorker);
}
public static TaskI initialize() {
return initialize(Runtime.getRuntime().availableProcessors());
}
@Override
public void addTask(AltarikRunnable function) throws InterruptedException {
this.addTask(function, 0, 1);
}
@Override
public void addTask(AltarikRunnable function, long delay, long period) throws InterruptedException {
if(worker.isTerminated() || worker.isShutdown()) {
throw new InterruptedException("Worker has been terminated or shutdown, it's impossible to add new task");
}
schedulers.stream()
.min((o1, o2) -> IntComparators.NATURAL_COMPARATOR.compare(o1.getNumberOfTasks(), o2.getNumberOfTasks()))
.orElseThrow()
.sendAsyncTask(function, delay, period);
}
@Override
public void close() throws Exception {
schedulers.forEach(s -> s.setStop(true));
worker.shutdown();
boolean result = worker.awaitTermination(10, TimeUnit.SECONDS);
if(!result) {
worker.shutdownNow();
throw new AsyncTasks.UnfinishedTasksException("Tasks take too many time to finish, shutdown has been enforce");
}
}
}

View File

@ -18,14 +18,14 @@ public class AsyncTasks implements TaskI {
if(numberOfWorker == 1) {
worker = Executors.newSingleThreadExecutor();
} else if (numberOfWorker <= 0) {
worker = Executors.newCachedThreadPool();
worker = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
} else {
worker = Executors.newFixedThreadPool(numberOfWorker);
}
}
/**
* Call this method at startup or before first use of {@link AsyncTasks#addTask(Runnable)}, cause without it, nothing will work
* Call this method at startup or before first use of {@link AsyncTasks#addTask(AltarikRunnable)}, cause without it, nothing will work
* This method declare worker thread and start it, without call it, by calling addTask(Runnable), it'll add your task to Queue, but tasks will never be consumed.
*
* @return an instance of AsyncTasks

View File

@ -1,12 +1,13 @@
package fr.altarik.toolbox.task.syncTasks;
import fr.altarik.toolbox.task.AltarikRunnable;
import fr.altarik.toolbox.task.PeriodicTaskI;
import fr.altarik.toolbox.task.TaskI;
import java.util.ArrayList;
import java.util.List;
public class PeriodicSyncTask implements TaskI, Runnable {
public class PeriodicSyncTask implements PeriodicTaskI, Runnable {
private ServerTickListener listener;
private List<AltarikRunnable> tasks;
@ -26,11 +27,6 @@ public class PeriodicSyncTask implements TaskI, Runnable {
tasks.add(function);
}
@Override
public void close() throws Exception {
}
@Override
public void run() {
List<AltarikRunnable> removeList = new ArrayList<>(tasks.size());
@ -44,4 +40,9 @@ public class PeriodicSyncTask implements TaskI, Runnable {
}
tasks.removeAll(removeList);
}
@Override
public void addTask(AltarikRunnable function, long delay, long period) throws InterruptedException {
}
}

View File

@ -0,0 +1,39 @@
package fr.altarik.toolbox.task.syncTasks;
import fr.altarik.toolbox.task.AltarikRunnable;
public class SchedulerTaskData {
private final long delay;
private final long period;
private final AltarikRunnable function;
private long currentDelay;
public SchedulerTaskData(AltarikRunnable function, long delay, long period) {
this.function = function;
this.delay = delay;
this.period = period;
this.currentDelay = delay;
}
public AltarikRunnable getFunction() {
return function;
}
public long getCurrentDelay() {
return currentDelay;
}
public void setCurrentDelay(long currentDelay) {
this.currentDelay = currentDelay;
}
public long getDelay() {
return delay;
}
public long getPeriod() {
return period;
}
}