summaryrefslogtreecommitdiff
path: root/src/main/kotlin/io/dico/parcels2/blockvisitor/WorkDispatcher.kt
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/blockvisitor/WorkDispatcher.kt')
-rw-r--r--src/main/kotlin/io/dico/parcels2/blockvisitor/WorkDispatcher.kt345
1 files changed, 345 insertions, 0 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorkDispatcher.kt b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorkDispatcher.kt
new file mode 100644
index 0000000..7201a06
--- /dev/null
+++ b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorkDispatcher.kt
@@ -0,0 +1,345 @@
+package io.dico.parcels2.blockvisitor
+
+import io.dico.parcels2.ParcelsPlugin
+import io.dico.parcels2.logger
+import io.dico.parcels2.util.ext.clampMin
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.CoroutineStart.LAZY
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.launch
+import org.bukkit.scheduler.BukkitTask
+import java.lang.System.currentTimeMillis
+import java.util.LinkedList
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
+import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
+import kotlin.coroutines.resume
+
+typealias WorkerTask = suspend WorkerScope.() -> Unit
+typealias WorkerUpdateLister = Worker.(Double, Long) -> Unit
+
+data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int)
+
+interface WorkDispatcher {
+ /**
+ * Submit a [task] that should be run synchronously, but limited such that it does not stall the server
+ * a bunch
+ */
+ fun dispatch(task: WorkerTask): Worker
+
+ /**
+ * Get a list of all workers
+ */
+ val workers: List<Worker>
+
+ /**
+ * Attempts to complete any remaining tasks immediately, without suspension.
+ */
+ fun completeAllTasks()
+}
+
+interface WorkerAndScopeMembersUnion {
+ /**
+ * The time that elapsed since this worker was dispatched, in milliseconds
+ */
+ val elapsedTime: Long
+
+ /**
+ * A value indicating the progress of this worker, in the range 0.0 <= progress <= 1.0
+ * with no guarantees to its accuracy.
+ */
+ val progress: Double
+}
+
+interface Worker : WorkerAndScopeMembersUnion {
+ /**
+ * The coroutine associated with this worker
+ */
+ val job: Job
+
+ /**
+ * true if this worker has completed
+ */
+ val isComplete: Boolean
+
+ /**
+ * If an exception was thrown during the execution of this task,
+ * returns that exception. Returns null otherwise.
+ */
+ val completionException: Throwable?
+
+ /**
+ * Calls the given [block] whenever the progress of this worker is updated,
+ * if [minInterval] milliseconds expired since the last call.
+ * The first call occurs after at least [minDelay] milliseconds in a likewise manner.
+ * Repeated invocations of this method result in an [IllegalStateException]
+ *
+ * if [asCompletionListener] is true, [onCompleted] is called with the same [block]
+ */
+ fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean = true, block: WorkerUpdateLister): Worker
+
+ /**
+ * Calls the given [block] when this worker completes, with the progress value 1.0.
+ * Multiple listeners may be registered to this function.
+ */
+ fun onCompleted(block: WorkerUpdateLister): Worker
+
+ /**
+ * Await completion of this worker
+ */
+ suspend fun awaitCompletion()
+
+ /**
+ * An object attached to this worker
+ */
+ //val attachment: Any?
+}
+
+interface WorkerScope : WorkerAndScopeMembersUnion {
+ /**
+ * A task should call this frequently during its execution, such that the timer can suspend it when necessary.
+ */
+ suspend fun markSuspensionPoint()
+
+ /**
+ * A task should call this method to indicate its progress
+ */
+ fun setProgress(progress: Double)
+
+ /**
+ * Indicate that this job is complete
+ */
+ fun markComplete() = setProgress(1.0)
+
+ /**
+ * Get a [WorkerScope] that is responsible for [portion] part of the progress
+ * If [portion] is negative, the remainder of the progress is used
+ */
+ fun delegateWork(portion: Double = -1.0): WorkerScope
+}
+
+inline fun <T> WorkerScope.delegateWork(portion: Double = -1.0, block: WorkerScope.() -> T): T {
+ delegateWork(portion).apply {
+ val result = block()
+ markComplete()
+ return result
+ }
+}
+
+interface WorkerInternal : Worker, WorkerScope {
+ /**
+ * Start or resumes the execution of this worker
+ * and returns true if the worker completed
+ *
+ * [worktime] is the maximum amount of time, in milliseconds,
+ * that this job may run for until suspension.
+ *
+ * If [worktime] is not positive, the worker will complete
+ * without suspension and this method will always return true.
+ */
+ fun resume(worktime: Long): Boolean
+}
+
+/**
+ * An object that controls one or more jobs, ensuring that they don't stall the server too much.
+ * There is a configurable maxiumum amount of milliseconds that can be allocated to all workers together in each server tick
+ * This object attempts to split that maximum amount of milliseconds equally between all jobs
+ */
+class BukkitWorkDispatcher(private val plugin: ParcelsPlugin, var options: TickWorktimeOptions) : WorkDispatcher {
+ // The currently registered bukkit scheduler task
+ private var bukkitTask: BukkitTask? = null
+ // The workers.
+ private val _workers = LinkedList<WorkerInternal>()
+ override val workers: List<Worker> = _workers
+
+ override fun dispatch(task: WorkerTask): Worker {
+ val worker: WorkerInternal = WorkerImpl(plugin, task)
+
+ if (bukkitTask == null) {
+ val completed = worker.resume(options.workTime.toLong())
+ if (completed) return worker
+ bukkitTask = plugin.scheduleRepeating(0, options.tickInterval) { tickJobs() }
+ }
+ _workers.addFirst(worker)
+ return worker
+ }
+
+ private fun tickJobs() {
+ val workers = _workers
+ if (workers.isEmpty()) return
+ val tickStartTime = System.currentTimeMillis()
+
+ val iterator = workers.listIterator(index = 0)
+ while (iterator.hasNext()) {
+ val time = System.currentTimeMillis()
+ val timeElapsed = time - tickStartTime
+ val timeLeft = options.workTime - timeElapsed
+ if (timeLeft <= 0) return
+
+ val count = workers.size - iterator.nextIndex()
+ val timePerJob = (timeLeft + count - 1) / count
+ val worker = iterator.next()
+ val completed = worker.resume(timePerJob)
+ if (completed) {
+ iterator.remove()
+ }
+ }
+
+ if (workers.isEmpty()) {
+ bukkitTask?.cancel()
+ bukkitTask = null
+ }
+ }
+
+ override fun completeAllTasks() {
+ _workers.forEach {
+ it.resume(-1)
+ }
+ _workers.clear()
+ bukkitTask?.cancel()
+ bukkitTask = null
+ }
+
+}
+
+private class WorkerImpl(scope: CoroutineScope, task: WorkerTask) : WorkerInternal {
+ override val job: Job = scope.launch(start = LAZY) { task() }
+
+ private var continuation: Continuation<Unit>? = null
+ private var nextSuspensionTime: Long = 0L
+ private var completeForcefully = false
+ private var isStarted = false
+
+ override val elapsedTime
+ get() =
+ if (job.isCompleted) startTimeOrElapsedTime
+ else currentTimeMillis() - startTimeOrElapsedTime
+
+ override val isComplete get() = job.isCompleted
+
+ private var _progress = 0.0
+ override val progress get() = _progress
+ override var completionException: Throwable? = null; private set
+
+ private var startTimeOrElapsedTime: Long = 0L // startTime before completed, elapsed time otherwise
+ private var onProgressUpdate: WorkerUpdateLister? = null
+ private var progressUpdateInterval: Int = 0
+ private var lastUpdateTime: Long = 0L
+ private var onCompleted: WorkerUpdateLister? = null
+
+ init {
+ job.invokeOnCompletion { exception ->
+ // report any error that occurred
+ completionException = exception?.also {
+ if (it !is CancellationException)
+ logger.error("WorkerTask generated an exception", it)
+ }
+
+ // convert to elapsed time here
+ startTimeOrElapsedTime = System.currentTimeMillis() - startTimeOrElapsedTime
+ onCompleted?.let { it(1.0, elapsedTime) }
+
+ onCompleted = null
+ onProgressUpdate = { prog, el -> }
+ }
+ }
+
+ override fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean, block: WorkerUpdateLister): Worker {
+ onProgressUpdate?.let { throw IllegalStateException() }
+ if (asCompletionListener) onCompleted(block)
+ if (isComplete) return this
+ onProgressUpdate = block
+ progressUpdateInterval = minInterval
+ lastUpdateTime = System.currentTimeMillis() + minDelay - minInterval
+
+ return this
+ }
+
+ override fun onCompleted(block: WorkerUpdateLister): Worker {
+ if (isComplete) {
+ block(1.0, startTimeOrElapsedTime)
+ return this
+ }
+
+ val cur = onCompleted
+ onCompleted = if (cur == null) {
+ block
+ } else {
+ fun Worker.(prog: Double, el: Long) {
+ cur(prog, el)
+ block(prog, el)
+ }
+ }
+ return this
+ }
+
+ override suspend fun markSuspensionPoint() {
+ if (System.currentTimeMillis() >= nextSuspensionTime && !completeForcefully)
+ suspendCoroutineUninterceptedOrReturn { cont: Continuation<Unit> ->
+ continuation = cont
+ COROUTINE_SUSPENDED
+ }
+ }
+
+ override fun setProgress(progress: Double) {
+ this._progress = progress
+ val onProgressUpdate = onProgressUpdate ?: return
+ val time = System.currentTimeMillis()
+ if (time > lastUpdateTime + progressUpdateInterval) {
+ onProgressUpdate(progress, elapsedTime)
+ lastUpdateTime = time
+ }
+ }
+
+ override fun resume(worktime: Long): Boolean {
+ if (isComplete) return true
+
+ if (worktime > 0) {
+ nextSuspensionTime = currentTimeMillis() + worktime
+ } else {
+ completeForcefully = true
+ }
+
+ if (isStarted) {
+ continuation?.let {
+ continuation = null
+ it.resume(Unit)
+ return continuation == null
+ }
+ return true
+ }
+
+ isStarted = true
+ startTimeOrElapsedTime = System.currentTimeMillis()
+ job.start()
+
+ return continuation == null
+ }
+
+ override suspend fun awaitCompletion() {
+ job.join()
+ }
+
+ private fun delegateWork(curPortion: Double, portion: Double): WorkerScope =
+ DelegateScope(progress, curPortion * (if (portion < 0) 1.0 - progress else portion).clampMin(0.0))
+
+ override fun delegateWork(portion: Double): WorkerScope = delegateWork(1.0, portion)
+
+ private inner class DelegateScope(val progressStart: Double, val portion: Double) : WorkerScope {
+ override val elapsedTime: Long
+ get() = this@WorkerImpl.elapsedTime
+
+ override suspend fun markSuspensionPoint() =
+ this@WorkerImpl.markSuspensionPoint()
+
+ override val progress: Double
+ get() = (this@WorkerImpl.progress - progressStart) / portion
+
+ override fun setProgress(progress: Double) =
+ this@WorkerImpl.setProgress(progressStart + progress * portion)
+
+ override fun delegateWork(portion: Double): WorkerScope =
+ this@WorkerImpl.delegateWork(this.portion, portion)
+ }
+}