diff options
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt')
-rw-r--r-- | src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt | 64 |
1 files changed, 43 insertions, 21 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt index 45196f2..30eaabd 100644 --- a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt +++ b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt @@ -1,11 +1,12 @@ package io.dico.parcels2.blockvisitor -import kotlinx.coroutines.experimental.* -import org.bukkit.plugin.Plugin +import io.dico.parcels2.ParcelsPlugin +import io.dico.parcels2.util.FunctionHelper +import kotlinx.coroutines.experimental.CancellationException +import kotlinx.coroutines.experimental.Job import org.bukkit.scheduler.BukkitTask import java.lang.System.currentTimeMillis -import java.util.* -import java.util.concurrent.Executor +import java.util.LinkedList import java.util.logging.Level import kotlin.coroutines.experimental.Continuation import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED @@ -27,6 +28,11 @@ sealed class WorktimeLimiter { * Get a list of all workers */ abstract val workers: List<Worker> + + /** + * Attempts to complete any remaining tasks immediately, without suspension. + */ + abstract fun completeAllTasks() } interface Timed { @@ -90,8 +96,14 @@ interface WorkerScope : Timed { private interface WorkerContinuation : Worker, WorkerScope { /** - * Start or resume the execution of this worker - * returns true if the worker completed + * Start or resumes the execution of this worker + * and returns true if the worker completed + * + * [worktime] is the maximum amount of time, in milliseconds, + * that this job may run for until suspension. + * + * If [worktime] is not positive, the worker will complete + * without suspension and this method will always return true. */ fun resume(worktime: Long): Boolean } @@ -101,19 +113,17 @@ private interface WorkerContinuation : Worker, WorkerScope { * There is a configurable maxiumum amount of milliseconds that can be allocated to all workers together in each server tick * This object attempts to split that maximum amount of milliseconds equally between all jobs */ -class TickWorktimeLimiter(private val plugin: Plugin, var options: TickWorktimeOptions) : WorktimeLimiter() { - // Coroutine dispatcher for jobs - private val dispatcher = Executor(Runnable::run).asCoroutineDispatcher() +class TickWorktimeLimiter(private val plugin: ParcelsPlugin, var options: TickWorktimeOptions) : WorktimeLimiter() { // The currently registered bukkit scheduler task private var bukkitTask: BukkitTask? = null // The workers. - private var _workers = LinkedList<WorkerContinuation>() + private val _workers = LinkedList<WorkerContinuation>() override val workers: List<Worker> = _workers override fun submit(task: TimeLimitedTask): Worker { - val worker: WorkerContinuation = WorkerImpl(plugin, dispatcher, task) + val worker: WorkerContinuation = WorkerImpl(plugin.functionHelper, task) _workers.addFirst(worker) - if (bukkitTask == null) bukkitTask = plugin.server.scheduler.runTaskTimer(plugin, ::tickJobs, 0, options.tickInterval.toLong()) + if (bukkitTask == null) bukkitTask = plugin.functionHelper.scheduleRepeating(0, options.tickInterval) { tickJobs() } return worker } @@ -144,10 +154,18 @@ class TickWorktimeLimiter(private val plugin: Plugin, var options: TickWorktimeO } } + override fun completeAllTasks() { + _workers.forEach { + it.resume(-1) + } + _workers.clear() + bukkitTask?.cancel() + bukkitTask = null + } + } -private class WorkerImpl(val plugin: Plugin, - val dispatcher: CoroutineDispatcher, +private class WorkerImpl(val functionHelper: FunctionHelper, val task: TimeLimitedTask) : WorkerContinuation { override var job: Job? = null; private set @@ -170,6 +188,7 @@ private class WorkerImpl(val plugin: Plugin, private var onCompleted: WorkerUpdateLister? = null private var continuation: Continuation<Unit>? = null private var nextSuspensionTime: Long = 0L + private var completeForcefully = false private fun initJob(job: Job) { this.job?.let { throw IllegalStateException() } @@ -179,7 +198,7 @@ private class WorkerImpl(val plugin: Plugin, // report any error that occurred completionException = exception?.also { if (it !is CancellationException) - plugin.logger.log(Level.SEVERE, "TimeLimitedTask for plugin ${plugin.name} generated an exception", it) + functionHelper.plugin.logger.log(Level.SEVERE, "TimeLimitedTask for plugin ${functionHelper.plugin.name} generated an exception", it) } // convert to elapsed time here @@ -204,7 +223,7 @@ private class WorkerImpl(val plugin: Plugin, } override suspend fun markSuspensionPoint() { - if (System.currentTimeMillis() >= nextSuspensionTime) + if (System.currentTimeMillis() >= nextSuspensionTime && !completeForcefully) suspendCoroutineUninterceptedOrReturn { cont: Continuation<Unit> -> continuation = cont COROUTINE_SUSPENDED @@ -222,7 +241,11 @@ private class WorkerImpl(val plugin: Plugin, } override fun resume(worktime: Long): Boolean { - nextSuspensionTime = currentTimeMillis() + worktime + if (worktime > 0) { + nextSuspensionTime = currentTimeMillis() + worktime + } else { + completeForcefully = true + } continuation?.let { continuation = null @@ -236,10 +259,9 @@ private class WorkerImpl(val plugin: Plugin, } try { - launch(context = dispatcher, start = CoroutineStart.UNDISPATCHED) { - initJob(job = kotlin.coroutines.experimental.coroutineContext[Job]!!) - task() - } + val job = functionHelper.launchLazilyOnMainThread { task() } + initJob(job = job) + job.start() } catch (t: Throwable) { // do nothing: handled by job.invokeOnCompletion() } |