summaryrefslogtreecommitdiff
path: root/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt')
-rw-r--r--src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt64
1 files changed, 43 insertions, 21 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
index 45196f2..30eaabd 100644
--- a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
+++ b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
@@ -1,11 +1,12 @@
package io.dico.parcels2.blockvisitor
-import kotlinx.coroutines.experimental.*
-import org.bukkit.plugin.Plugin
+import io.dico.parcels2.ParcelsPlugin
+import io.dico.parcels2.util.FunctionHelper
+import kotlinx.coroutines.experimental.CancellationException
+import kotlinx.coroutines.experimental.Job
import org.bukkit.scheduler.BukkitTask
import java.lang.System.currentTimeMillis
-import java.util.*
-import java.util.concurrent.Executor
+import java.util.LinkedList
import java.util.logging.Level
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
@@ -27,6 +28,11 @@ sealed class WorktimeLimiter {
* Get a list of all workers
*/
abstract val workers: List<Worker>
+
+ /**
+ * Attempts to complete any remaining tasks immediately, without suspension.
+ */
+ abstract fun completeAllTasks()
}
interface Timed {
@@ -90,8 +96,14 @@ interface WorkerScope : Timed {
private interface WorkerContinuation : Worker, WorkerScope {
/**
- * Start or resume the execution of this worker
- * returns true if the worker completed
+ * Start or resumes the execution of this worker
+ * and returns true if the worker completed
+ *
+ * [worktime] is the maximum amount of time, in milliseconds,
+ * that this job may run for until suspension.
+ *
+ * If [worktime] is not positive, the worker will complete
+ * without suspension and this method will always return true.
*/
fun resume(worktime: Long): Boolean
}
@@ -101,19 +113,17 @@ private interface WorkerContinuation : Worker, WorkerScope {
* There is a configurable maxiumum amount of milliseconds that can be allocated to all workers together in each server tick
* This object attempts to split that maximum amount of milliseconds equally between all jobs
*/
-class TickWorktimeLimiter(private val plugin: Plugin, var options: TickWorktimeOptions) : WorktimeLimiter() {
- // Coroutine dispatcher for jobs
- private val dispatcher = Executor(Runnable::run).asCoroutineDispatcher()
+class TickWorktimeLimiter(private val plugin: ParcelsPlugin, var options: TickWorktimeOptions) : WorktimeLimiter() {
// The currently registered bukkit scheduler task
private var bukkitTask: BukkitTask? = null
// The workers.
- private var _workers = LinkedList<WorkerContinuation>()
+ private val _workers = LinkedList<WorkerContinuation>()
override val workers: List<Worker> = _workers
override fun submit(task: TimeLimitedTask): Worker {
- val worker: WorkerContinuation = WorkerImpl(plugin, dispatcher, task)
+ val worker: WorkerContinuation = WorkerImpl(plugin.functionHelper, task)
_workers.addFirst(worker)
- if (bukkitTask == null) bukkitTask = plugin.server.scheduler.runTaskTimer(plugin, ::tickJobs, 0, options.tickInterval.toLong())
+ if (bukkitTask == null) bukkitTask = plugin.functionHelper.scheduleRepeating(0, options.tickInterval) { tickJobs() }
return worker
}
@@ -144,10 +154,18 @@ class TickWorktimeLimiter(private val plugin: Plugin, var options: TickWorktimeO
}
}
+ override fun completeAllTasks() {
+ _workers.forEach {
+ it.resume(-1)
+ }
+ _workers.clear()
+ bukkitTask?.cancel()
+ bukkitTask = null
+ }
+
}
-private class WorkerImpl(val plugin: Plugin,
- val dispatcher: CoroutineDispatcher,
+private class WorkerImpl(val functionHelper: FunctionHelper,
val task: TimeLimitedTask) : WorkerContinuation {
override var job: Job? = null; private set
@@ -170,6 +188,7 @@ private class WorkerImpl(val plugin: Plugin,
private var onCompleted: WorkerUpdateLister? = null
private var continuation: Continuation<Unit>? = null
private var nextSuspensionTime: Long = 0L
+ private var completeForcefully = false
private fun initJob(job: Job) {
this.job?.let { throw IllegalStateException() }
@@ -179,7 +198,7 @@ private class WorkerImpl(val plugin: Plugin,
// report any error that occurred
completionException = exception?.also {
if (it !is CancellationException)
- plugin.logger.log(Level.SEVERE, "TimeLimitedTask for plugin ${plugin.name} generated an exception", it)
+ functionHelper.plugin.logger.log(Level.SEVERE, "TimeLimitedTask for plugin ${functionHelper.plugin.name} generated an exception", it)
}
// convert to elapsed time here
@@ -204,7 +223,7 @@ private class WorkerImpl(val plugin: Plugin,
}
override suspend fun markSuspensionPoint() {
- if (System.currentTimeMillis() >= nextSuspensionTime)
+ if (System.currentTimeMillis() >= nextSuspensionTime && !completeForcefully)
suspendCoroutineUninterceptedOrReturn { cont: Continuation<Unit> ->
continuation = cont
COROUTINE_SUSPENDED
@@ -222,7 +241,11 @@ private class WorkerImpl(val plugin: Plugin,
}
override fun resume(worktime: Long): Boolean {
- nextSuspensionTime = currentTimeMillis() + worktime
+ if (worktime > 0) {
+ nextSuspensionTime = currentTimeMillis() + worktime
+ } else {
+ completeForcefully = true
+ }
continuation?.let {
continuation = null
@@ -236,10 +259,9 @@ private class WorkerImpl(val plugin: Plugin,
}
try {
- launch(context = dispatcher, start = CoroutineStart.UNDISPATCHED) {
- initJob(job = kotlin.coroutines.experimental.coroutineContext[Job]!!)
- task()
- }
+ val job = functionHelper.launchLazilyOnMainThread { task() }
+ initJob(job = job)
+ job.start()
} catch (t: Throwable) {
// do nothing: handled by job.invokeOnCompletion()
}