diff options
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt')
-rw-r--r-- | src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt | 129 |
1 files changed, 81 insertions, 48 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt index 4cfd2a3..553362e 100644 --- a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt +++ b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt @@ -2,6 +2,7 @@ package io.dico.parcels2.blockvisitor import io.dico.parcels2.ParcelsPlugin import io.dico.parcels2.logger +import io.dico.parcels2.util.ext.clampMin import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineStart.LAZY @@ -14,7 +15,6 @@ import kotlin.coroutines.Continuation import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine typealias TimeLimitedTask = suspend WorkerScope.() -> Unit typealias WorkerUpdateLister = Worker.(Double, Long) -> Unit @@ -48,9 +48,9 @@ interface Timed { interface Worker : Timed { /** - * The coroutine associated with this worker, if any + * The coroutine associated with this worker */ - val job: Job? + val job: Job /** * true if this worker has completed @@ -65,9 +65,9 @@ interface Worker : Timed { /** * A value indicating the progress of this worker, in the range 0.0 <= progress <= 1.0 - * with no guarantees to its accuracy. May be null. + * with no guarantees to its accuracy. */ - val progress: Double? + val progress: Double /** * Calls the given [block] whenever the progress of this worker is updated, @@ -89,6 +89,11 @@ interface Worker : Timed { * Await completion of this worker */ suspend fun awaitCompletion() + + /** + * An object attached to this worker + */ + //val attachment: Any? } interface WorkerScope : Timed { @@ -98,9 +103,34 @@ interface WorkerScope : Timed { suspend fun markSuspensionPoint() /** + * A value indicating the progress of this worker, in the range 0.0 <= progress <= 1.0 + * with no guarantees to its accuracy. + */ + val progress: Double + + /** * A task should call this method to indicate its progress */ fun setProgress(progress: Double) + + /** + * Indicate that this job is complete + */ + fun markComplete() = setProgress(1.0) + + /** + * Get a [WorkerScope] that is responsible for [portion] part of the progress + * If [portion] is negative, the remainder of the progress is used + */ + fun delegateWork(portion: Double = -1.0): WorkerScope +} + +inline fun <T> WorkerScope.delegateWork(portion: Double = -1.0, block: WorkerScope.() -> T): T { + delegateWork(portion).apply { + val result = block() + markComplete() + return result + } } interface WorkerInternal : Worker, WorkerScope { @@ -179,37 +209,32 @@ class TickWorktimeLimiter(private val plugin: ParcelsPlugin, var options: TickWo } -private class WorkerImpl( - val scope: CoroutineScope, - val task: TimeLimitedTask -) : WorkerInternal, CoroutineScope by scope { - override var job: Job? = null; private set +private class WorkerImpl(scope: CoroutineScope, task: TimeLimitedTask) : WorkerInternal { + override val job: Job = scope.launch(start = LAZY) { task() } + + private var continuation: Continuation<Unit>? = null + private var nextSuspensionTime: Long = 0L + private var completeForcefully = false + private var isStarted = false override val elapsedTime - get() = job?.let { - if (it.isCompleted) startTimeOrElapsedTime + get() = + if (job.isCompleted) startTimeOrElapsedTime else currentTimeMillis() - startTimeOrElapsedTime - } ?: 0L - override val isComplete get() = job?.isCompleted == true + override val isComplete get() = job.isCompleted + private var _progress = 0.0 + override val progress get() = _progress override var completionException: Throwable? = null; private set - override var progress: Double? = null; private set - private var startTimeOrElapsedTime: Long = 0L // startTime before completed, elapsed time otherwise private var onProgressUpdate: WorkerUpdateLister? = null private var progressUpdateInterval: Int = 0 private var lastUpdateTime: Long = 0L private var onCompleted: WorkerUpdateLister? = null - private var continuation: Continuation<Unit>? = null - private var nextSuspensionTime: Long = 0L - private var completeForcefully = false - private fun initJob(job: Job) { - this.job?.let { throw IllegalStateException() } - this.job = job - startTimeOrElapsedTime = System.currentTimeMillis() + init { job.invokeOnCompletion { exception -> // report any error that occurred completionException = exception?.also { @@ -264,7 +289,7 @@ private class WorkerImpl( } override fun setProgress(progress: Double) { - this.progress = progress + this._progress = progress val onProgressUpdate = onProgressUpdate ?: return val time = System.currentTimeMillis() if (time > lastUpdateTime + progressUpdateInterval) { @@ -274,46 +299,54 @@ private class WorkerImpl( } override fun resume(worktime: Long): Boolean { + if (isComplete) return true + if (worktime > 0) { nextSuspensionTime = currentTimeMillis() + worktime } else { completeForcefully = true } - continuation?.let { - continuation = null - it.resume(Unit) - return continuation == null - } - - job?.let { - nextSuspensionTime = 0L - throw IllegalStateException() + if (isStarted) { + continuation?.let { + continuation = null + it.resume(Unit) + return continuation == null + } + return true } - try { - val job = launch(start = LAZY) { task() } - initJob(job = job) - job.start() - } catch (t: Throwable) { - // do nothing: handled by job.invokeOnCompletion() - } + startTimeOrElapsedTime = System.currentTimeMillis() + job.start() return continuation == null } override suspend fun awaitCompletion() { - if (isComplete) return + job.join() + } - // easy path - if the job was initialized already - job?.apply { join(); return } + private fun delegateWork(curPortion: Double, portion: Double): WorkerScope = + DelegateScope(progress, curPortion * (if (portion < 0) 1.0 - progress else portion).clampMin(0.0)) - // other way. - return suspendCoroutine { cont -> - onCompleted { prog, el -> cont.resume(Unit) } - } - } + override fun delegateWork(portion: Double): WorkerScope = delegateWork(1.0, portion) + + private inner class DelegateScope(val progressStart: Double, val portion: Double) : WorkerScope { + override val elapsedTime: Long + get() = this@WorkerImpl.elapsedTime + override suspend fun markSuspensionPoint() = + this@WorkerImpl.markSuspensionPoint() + + override val progress: Double + get() = (this@WorkerImpl.progress - progressStart) / portion + + override fun setProgress(progress: Double) = + this@WorkerImpl.setProgress(progressStart + progress * portion) + + override fun delegateWork(portion: Double): WorkerScope = + this@WorkerImpl.delegateWork(this.portion, portion) + } } /* |