summaryrefslogtreecommitdiff
path: root/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt')
-rw-r--r--src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt84
1 files changed, 62 insertions, 22 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
index ea4db62..f735903 100644
--- a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
+++ b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
@@ -1,38 +1,42 @@
package io.dico.parcels2.blockvisitor
import io.dico.parcels2.ParcelsPlugin
-import io.dico.parcels2.util.FunctionHelper
-import kotlinx.coroutines.experimental.CancellationException
-import kotlinx.coroutines.experimental.Job
+import io.dico.parcels2.logger
+import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.CoroutineStart.LAZY
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.launch
import org.bukkit.scheduler.BukkitTask
import java.lang.System.currentTimeMillis
import java.util.LinkedList
-import java.util.logging.Level
-import kotlin.coroutines.experimental.Continuation
-import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
-import kotlin.coroutines.experimental.intrinsics.suspendCoroutineUninterceptedOrReturn
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
+import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
+import kotlin.coroutines.resume
+import kotlin.coroutines.suspendCoroutine
typealias TimeLimitedTask = suspend WorkerScope.() -> Unit
typealias WorkerUpdateLister = Worker.(Double, Long) -> Unit
data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int)
-sealed class WorktimeLimiter {
+interface WorktimeLimiter {
/**
* Submit a [task] that should be run synchronously, but limited such that it does not stall the server
* a bunch
*/
- abstract fun submit(task: TimeLimitedTask): Worker
+ fun submit(task: TimeLimitedTask): Worker
/**
* Get a list of all workers
*/
- abstract val workers: List<Worker>
+ val workers: List<Worker>
/**
* Attempts to complete any remaining tasks immediately, without suspension.
*/
- abstract fun completeAllTasks()
+ fun completeAllTasks()
}
interface Timed {
@@ -77,9 +81,14 @@ interface Worker : Timed {
/**
* Calls the given [block] when this worker completes, with the progress value 1.0.
- * Repeated invocations of this method result in an [IllegalStateException]
+ * Multiple listeners may be registered to this function.
*/
fun onCompleted(block: WorkerUpdateLister): Worker
+
+ /**
+ * Await completion of this worker
+ */
+ suspend fun awaitCompletion()
}
interface WorkerScope : Timed {
@@ -113,7 +122,7 @@ private interface WorkerContinuation : Worker, WorkerScope {
* There is a configurable maxiumum amount of milliseconds that can be allocated to all workers together in each server tick
* This object attempts to split that maximum amount of milliseconds equally between all jobs
*/
-class TickWorktimeLimiter(private val plugin: ParcelsPlugin, var options: TickWorktimeOptions) : WorktimeLimiter() {
+class TickWorktimeLimiter(private val plugin: ParcelsPlugin, var options: TickWorktimeOptions) : WorktimeLimiter {
// The currently registered bukkit scheduler task
private var bukkitTask: BukkitTask? = null
// The workers.
@@ -121,12 +130,12 @@ class TickWorktimeLimiter(private val plugin: ParcelsPlugin, var options: TickWo
override val workers: List<Worker> = _workers
override fun submit(task: TimeLimitedTask): Worker {
- val worker: WorkerContinuation = WorkerImpl(plugin.functionHelper, task)
+ val worker: WorkerContinuation = WorkerImpl(plugin, task)
if (bukkitTask == null) {
val completed = worker.resume(options.workTime.toLong())
if (completed) return worker
- bukkitTask = plugin.functionHelper.scheduleRepeating(0, options.tickInterval) { tickJobs() }
+ bukkitTask = plugin.scheduleRepeating(0, options.tickInterval) { tickJobs() }
}
_workers.addFirst(worker)
@@ -171,8 +180,10 @@ class TickWorktimeLimiter(private val plugin: ParcelsPlugin, var options: TickWo
}
-private class WorkerImpl(val functionHelper: FunctionHelper,
- val task: TimeLimitedTask) : WorkerContinuation {
+private class WorkerImpl(
+ val scope: CoroutineScope,
+ val task: TimeLimitedTask
+) : WorkerContinuation, CoroutineScope by scope {
override var job: Job? = null; private set
override val elapsedTime
@@ -204,27 +215,44 @@ private class WorkerImpl(val functionHelper: FunctionHelper,
// report any error that occurred
completionException = exception?.also {
if (it !is CancellationException)
- functionHelper.plugin.logger.log(Level.SEVERE, "TimeLimitedTask for plugin ${functionHelper.plugin.name} generated an exception", it)
+ logger.error("TimeLimitedTask generated an exception", it)
}
// convert to elapsed time here
startTimeOrElapsedTime = System.currentTimeMillis() - startTimeOrElapsedTime
onCompleted?.let { it(1.0, elapsedTime) }
+
+ onCompleted = null
+ onProgressUpdate = { prog, el -> }
}
}
override fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean, block: WorkerUpdateLister): Worker {
onProgressUpdate?.let { throw IllegalStateException() }
+ if (asCompletionListener) onCompleted(block)
+ if (isComplete) return this
onProgressUpdate = block
progressUpdateInterval = minInterval
lastUpdateTime = System.currentTimeMillis() + minDelay - minInterval
- if (asCompletionListener) onCompleted(block)
+
return this
}
override fun onCompleted(block: WorkerUpdateLister): Worker {
- onCompleted?.let { throw IllegalStateException() }
- onCompleted = block
+ if (isComplete) {
+ block(1.0, startTimeOrElapsedTime)
+ return this
+ }
+
+ val cur = onCompleted
+ onCompleted = if (cur == null) {
+ block
+ } else {
+ fun Worker.(prog: Double, el: Long) {
+ cur(prog, el)
+ block(prog, el)
+ }
+ }
return this
}
@@ -265,7 +293,7 @@ private class WorkerImpl(val functionHelper: FunctionHelper,
}
try {
- val job = functionHelper.launchLazilyOnMainThread { task() }
+ val job = launch(start = LAZY) { task() }
initJob(job = job)
job.start()
} catch (t: Throwable) {
@@ -275,6 +303,18 @@ private class WorkerImpl(val functionHelper: FunctionHelper,
return continuation == null
}
+ override suspend fun awaitCompletion() {
+ if (isComplete) return
+
+ // easy path - if the job was initialized already
+ job?.apply { join(); return }
+
+ // other way.
+ return suspendCoroutine { cont ->
+ onCompleted { prog, el -> cont.resume(Unit) }
+ }
+ }
+
}
/*