diff options
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt')
-rw-r--r-- | src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt | 84 |
1 files changed, 62 insertions, 22 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt index ea4db62..f735903 100644 --- a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt +++ b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt @@ -1,38 +1,42 @@ package io.dico.parcels2.blockvisitor import io.dico.parcels2.ParcelsPlugin -import io.dico.parcels2.util.FunctionHelper -import kotlinx.coroutines.experimental.CancellationException -import kotlinx.coroutines.experimental.Job +import io.dico.parcels2.logger +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart.LAZY +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch import org.bukkit.scheduler.BukkitTask import java.lang.System.currentTimeMillis import java.util.LinkedList -import java.util.logging.Level -import kotlin.coroutines.experimental.Continuation -import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED -import kotlin.coroutines.experimental.intrinsics.suspendCoroutineUninterceptedOrReturn +import kotlin.coroutines.Continuation +import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED +import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine typealias TimeLimitedTask = suspend WorkerScope.() -> Unit typealias WorkerUpdateLister = Worker.(Double, Long) -> Unit data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int) -sealed class WorktimeLimiter { +interface WorktimeLimiter { /** * Submit a [task] that should be run synchronously, but limited such that it does not stall the server * a bunch */ - abstract fun submit(task: TimeLimitedTask): Worker + fun submit(task: TimeLimitedTask): Worker /** * Get a list of all workers */ - abstract val workers: List<Worker> + val workers: List<Worker> /** * Attempts to complete any remaining tasks immediately, without suspension. */ - abstract fun completeAllTasks() + fun completeAllTasks() } interface Timed { @@ -77,9 +81,14 @@ interface Worker : Timed { /** * Calls the given [block] when this worker completes, with the progress value 1.0. - * Repeated invocations of this method result in an [IllegalStateException] + * Multiple listeners may be registered to this function. */ fun onCompleted(block: WorkerUpdateLister): Worker + + /** + * Await completion of this worker + */ + suspend fun awaitCompletion() } interface WorkerScope : Timed { @@ -113,7 +122,7 @@ private interface WorkerContinuation : Worker, WorkerScope { * There is a configurable maxiumum amount of milliseconds that can be allocated to all workers together in each server tick * This object attempts to split that maximum amount of milliseconds equally between all jobs */ -class TickWorktimeLimiter(private val plugin: ParcelsPlugin, var options: TickWorktimeOptions) : WorktimeLimiter() { +class TickWorktimeLimiter(private val plugin: ParcelsPlugin, var options: TickWorktimeOptions) : WorktimeLimiter { // The currently registered bukkit scheduler task private var bukkitTask: BukkitTask? = null // The workers. @@ -121,12 +130,12 @@ class TickWorktimeLimiter(private val plugin: ParcelsPlugin, var options: TickWo override val workers: List<Worker> = _workers override fun submit(task: TimeLimitedTask): Worker { - val worker: WorkerContinuation = WorkerImpl(plugin.functionHelper, task) + val worker: WorkerContinuation = WorkerImpl(plugin, task) if (bukkitTask == null) { val completed = worker.resume(options.workTime.toLong()) if (completed) return worker - bukkitTask = plugin.functionHelper.scheduleRepeating(0, options.tickInterval) { tickJobs() } + bukkitTask = plugin.scheduleRepeating(0, options.tickInterval) { tickJobs() } } _workers.addFirst(worker) @@ -171,8 +180,10 @@ class TickWorktimeLimiter(private val plugin: ParcelsPlugin, var options: TickWo } -private class WorkerImpl(val functionHelper: FunctionHelper, - val task: TimeLimitedTask) : WorkerContinuation { +private class WorkerImpl( + val scope: CoroutineScope, + val task: TimeLimitedTask +) : WorkerContinuation, CoroutineScope by scope { override var job: Job? = null; private set override val elapsedTime @@ -204,27 +215,44 @@ private class WorkerImpl(val functionHelper: FunctionHelper, // report any error that occurred completionException = exception?.also { if (it !is CancellationException) - functionHelper.plugin.logger.log(Level.SEVERE, "TimeLimitedTask for plugin ${functionHelper.plugin.name} generated an exception", it) + logger.error("TimeLimitedTask generated an exception", it) } // convert to elapsed time here startTimeOrElapsedTime = System.currentTimeMillis() - startTimeOrElapsedTime onCompleted?.let { it(1.0, elapsedTime) } + + onCompleted = null + onProgressUpdate = { prog, el -> } } } override fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean, block: WorkerUpdateLister): Worker { onProgressUpdate?.let { throw IllegalStateException() } + if (asCompletionListener) onCompleted(block) + if (isComplete) return this onProgressUpdate = block progressUpdateInterval = minInterval lastUpdateTime = System.currentTimeMillis() + minDelay - minInterval - if (asCompletionListener) onCompleted(block) + return this } override fun onCompleted(block: WorkerUpdateLister): Worker { - onCompleted?.let { throw IllegalStateException() } - onCompleted = block + if (isComplete) { + block(1.0, startTimeOrElapsedTime) + return this + } + + val cur = onCompleted + onCompleted = if (cur == null) { + block + } else { + fun Worker.(prog: Double, el: Long) { + cur(prog, el) + block(prog, el) + } + } return this } @@ -265,7 +293,7 @@ private class WorkerImpl(val functionHelper: FunctionHelper, } try { - val job = functionHelper.launchLazilyOnMainThread { task() } + val job = launch(start = LAZY) { task() } initJob(job = job) job.start() } catch (t: Throwable) { @@ -275,6 +303,18 @@ private class WorkerImpl(val functionHelper: FunctionHelper, return continuation == null } + override suspend fun awaitCompletion() { + if (isComplete) return + + // easy path - if the job was initialized already + job?.apply { join(); return } + + // other way. + return suspendCoroutine { cont -> + onCompleted { prog, el -> cont.resume(Unit) } + } + } + } /* |