summaryrefslogtreecommitdiff
path: root/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt')
-rw-r--r--src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt129
1 files changed, 81 insertions, 48 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
index 4cfd2a3..553362e 100644
--- a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
+++ b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
@@ -2,6 +2,7 @@ package io.dico.parcels2.blockvisitor
import io.dico.parcels2.ParcelsPlugin
import io.dico.parcels2.logger
+import io.dico.parcels2.util.ext.clampMin
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart.LAZY
@@ -14,7 +15,6 @@ import kotlin.coroutines.Continuation
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
import kotlin.coroutines.resume
-import kotlin.coroutines.suspendCoroutine
typealias TimeLimitedTask = suspend WorkerScope.() -> Unit
typealias WorkerUpdateLister = Worker.(Double, Long) -> Unit
@@ -48,9 +48,9 @@ interface Timed {
interface Worker : Timed {
/**
- * The coroutine associated with this worker, if any
+ * The coroutine associated with this worker
*/
- val job: Job?
+ val job: Job
/**
* true if this worker has completed
@@ -65,9 +65,9 @@ interface Worker : Timed {
/**
* A value indicating the progress of this worker, in the range 0.0 <= progress <= 1.0
- * with no guarantees to its accuracy. May be null.
+ * with no guarantees to its accuracy.
*/
- val progress: Double?
+ val progress: Double
/**
* Calls the given [block] whenever the progress of this worker is updated,
@@ -89,6 +89,11 @@ interface Worker : Timed {
* Await completion of this worker
*/
suspend fun awaitCompletion()
+
+ /**
+ * An object attached to this worker
+ */
+ //val attachment: Any?
}
interface WorkerScope : Timed {
@@ -98,9 +103,34 @@ interface WorkerScope : Timed {
suspend fun markSuspensionPoint()
/**
+ * A value indicating the progress of this worker, in the range 0.0 <= progress <= 1.0
+ * with no guarantees to its accuracy.
+ */
+ val progress: Double
+
+ /**
* A task should call this method to indicate its progress
*/
fun setProgress(progress: Double)
+
+ /**
+ * Indicate that this job is complete
+ */
+ fun markComplete() = setProgress(1.0)
+
+ /**
+ * Get a [WorkerScope] that is responsible for [portion] part of the progress
+ * If [portion] is negative, the remainder of the progress is used
+ */
+ fun delegateWork(portion: Double = -1.0): WorkerScope
+}
+
+inline fun <T> WorkerScope.delegateWork(portion: Double = -1.0, block: WorkerScope.() -> T): T {
+ delegateWork(portion).apply {
+ val result = block()
+ markComplete()
+ return result
+ }
}
interface WorkerInternal : Worker, WorkerScope {
@@ -179,37 +209,32 @@ class TickWorktimeLimiter(private val plugin: ParcelsPlugin, var options: TickWo
}
-private class WorkerImpl(
- val scope: CoroutineScope,
- val task: TimeLimitedTask
-) : WorkerInternal, CoroutineScope by scope {
- override var job: Job? = null; private set
+private class WorkerImpl(scope: CoroutineScope, task: TimeLimitedTask) : WorkerInternal {
+ override val job: Job = scope.launch(start = LAZY) { task() }
+
+ private var continuation: Continuation<Unit>? = null
+ private var nextSuspensionTime: Long = 0L
+ private var completeForcefully = false
+ private var isStarted = false
override val elapsedTime
- get() = job?.let {
- if (it.isCompleted) startTimeOrElapsedTime
+ get() =
+ if (job.isCompleted) startTimeOrElapsedTime
else currentTimeMillis() - startTimeOrElapsedTime
- } ?: 0L
- override val isComplete get() = job?.isCompleted == true
+ override val isComplete get() = job.isCompleted
+ private var _progress = 0.0
+ override val progress get() = _progress
override var completionException: Throwable? = null; private set
- override var progress: Double? = null; private set
-
private var startTimeOrElapsedTime: Long = 0L // startTime before completed, elapsed time otherwise
private var onProgressUpdate: WorkerUpdateLister? = null
private var progressUpdateInterval: Int = 0
private var lastUpdateTime: Long = 0L
private var onCompleted: WorkerUpdateLister? = null
- private var continuation: Continuation<Unit>? = null
- private var nextSuspensionTime: Long = 0L
- private var completeForcefully = false
- private fun initJob(job: Job) {
- this.job?.let { throw IllegalStateException() }
- this.job = job
- startTimeOrElapsedTime = System.currentTimeMillis()
+ init {
job.invokeOnCompletion { exception ->
// report any error that occurred
completionException = exception?.also {
@@ -264,7 +289,7 @@ private class WorkerImpl(
}
override fun setProgress(progress: Double) {
- this.progress = progress
+ this._progress = progress
val onProgressUpdate = onProgressUpdate ?: return
val time = System.currentTimeMillis()
if (time > lastUpdateTime + progressUpdateInterval) {
@@ -274,46 +299,54 @@ private class WorkerImpl(
}
override fun resume(worktime: Long): Boolean {
+ if (isComplete) return true
+
if (worktime > 0) {
nextSuspensionTime = currentTimeMillis() + worktime
} else {
completeForcefully = true
}
- continuation?.let {
- continuation = null
- it.resume(Unit)
- return continuation == null
- }
-
- job?.let {
- nextSuspensionTime = 0L
- throw IllegalStateException()
+ if (isStarted) {
+ continuation?.let {
+ continuation = null
+ it.resume(Unit)
+ return continuation == null
+ }
+ return true
}
- try {
- val job = launch(start = LAZY) { task() }
- initJob(job = job)
- job.start()
- } catch (t: Throwable) {
- // do nothing: handled by job.invokeOnCompletion()
- }
+ startTimeOrElapsedTime = System.currentTimeMillis()
+ job.start()
return continuation == null
}
override suspend fun awaitCompletion() {
- if (isComplete) return
+ job.join()
+ }
- // easy path - if the job was initialized already
- job?.apply { join(); return }
+ private fun delegateWork(curPortion: Double, portion: Double): WorkerScope =
+ DelegateScope(progress, curPortion * (if (portion < 0) 1.0 - progress else portion).clampMin(0.0))
- // other way.
- return suspendCoroutine { cont ->
- onCompleted { prog, el -> cont.resume(Unit) }
- }
- }
+ override fun delegateWork(portion: Double): WorkerScope = delegateWork(1.0, portion)
+
+ private inner class DelegateScope(val progressStart: Double, val portion: Double) : WorkerScope {
+ override val elapsedTime: Long
+ get() = this@WorkerImpl.elapsedTime
+ override suspend fun markSuspensionPoint() =
+ this@WorkerImpl.markSuspensionPoint()
+
+ override val progress: Double
+ get() = (this@WorkerImpl.progress - progressStart) / portion
+
+ override fun setProgress(progress: Double) =
+ this@WorkerImpl.setProgress(progressStart + progress * portion)
+
+ override fun delegateWork(portion: Double): WorkerScope =
+ this@WorkerImpl.delegateWork(this.portion, portion)
+ }
}
/*