diff options
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/JobDispatcher.kt')
-rw-r--r-- | src/main/kotlin/io/dico/parcels2/JobDispatcher.kt | 61 |
1 files changed, 46 insertions, 15 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/JobDispatcher.kt b/src/main/kotlin/io/dico/parcels2/JobDispatcher.kt index 10da0da..ebbe334 100644 --- a/src/main/kotlin/io/dico/parcels2/JobDispatcher.kt +++ b/src/main/kotlin/io/dico/parcels2/JobDispatcher.kt @@ -1,6 +1,8 @@ package io.dico.parcels2 +import io.dico.parcels2.util.PluginAware import io.dico.parcels2.util.math.clampMin +import io.dico.parcels2.util.scheduleRepeating import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineStart.LAZY @@ -74,7 +76,12 @@ interface Job : JobAndScopeMembersUnion { * * if [asCompletionListener] is true, [onCompleted] is called with the same [block] */ - fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean = true, block: JobUpdateLister): Job + fun onProgressUpdate( + minDelay: Int, + minInterval: Int, + asCompletionListener: Boolean = true, + block: JobUpdateLister + ): Job /** * Calls the given [block] when this job completes, with the progress value 1.0. @@ -138,7 +145,11 @@ interface JobInternal : Job, JobScope { * There is a configurable maxiumum amount of milliseconds that can be allocated to all jobs together in each server tick * This object attempts to split that maximum amount of milliseconds equally between all jobs */ -class BukkitJobDispatcher(private val plugin: ParcelsPlugin, var options: TickJobtimeOptions) : JobDispatcher { +class BukkitJobDispatcher( + private val plugin: PluginAware, + private val scope: CoroutineScope, + var options: TickJobtimeOptions +) : JobDispatcher { // The currently registered bukkit scheduler task private var bukkitTask: BukkitTask? = null // The jobs. @@ -146,18 +157,18 @@ class BukkitJobDispatcher(private val plugin: ParcelsPlugin, var options: TickJo override val jobs: List<Job> = _jobs override fun dispatch(function: JobFunction): Job { - val job: JobInternal = JobImpl(plugin, function) + val job: JobInternal = JobImpl(scope, function) if (bukkitTask == null) { val completed = job.resume(options.jobTime.toLong()) if (completed) return job - bukkitTask = plugin.scheduleRepeating(0, options.tickInterval) { tickCoroutineJobs() } + bukkitTask = plugin.scheduleRepeating(options.tickInterval) { tickJobs() } } _jobs.addFirst(job) return job } - private fun tickCoroutineJobs() { + private fun tickJobs() { val jobs = _jobs if (jobs.isEmpty()) return val tickStartTime = System.currentTimeMillis() @@ -237,7 +248,12 @@ private class JobImpl(scope: CoroutineScope, task: JobFunction) : JobInternal { } } - override fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean, block: JobUpdateLister): Job { + override fun onProgressUpdate( + minDelay: Int, + minInterval: Int, + asCompletionListener: Boolean, + block: JobUpdateLister + ): Job { onProgressUpdate?.let { throw IllegalStateException() } if (asCompletionListener) onCompleted(block) if (isComplete) return this @@ -296,7 +312,11 @@ private class JobImpl(scope: CoroutineScope, task: JobFunction) : JobInternal { if (isStarted) { continuation?.let { continuation = null - it.resume(Unit) + + wrapExternalCall { + it.resume(Unit) + } + return continuation == null } return true @@ -304,34 +324,45 @@ private class JobImpl(scope: CoroutineScope, task: JobFunction) : JobInternal { isStarted = true startTimeOrElapsedTime = System.currentTimeMillis() - coroutine.start() + + wrapExternalCall { + coroutine.start() + } return continuation == null } + private inline fun wrapExternalCall(block: () -> Unit) { + try { + block() + } catch (ex: Throwable) { + logger.error("Job $coroutine generated an exception", ex) + } + } + override suspend fun awaitCompletion() { coroutine.join() } private fun delegateProgress(curPortion: Double, portion: Double): JobScope = - DelegateScope(progress, curPortion * (if (portion < 0) 1.0 - progress else portion).clampMin(0.0)) + DelegateScope(this, progress, curPortion * (if (portion < 0) 1.0 - progress else portion).clampMin(0.0)) override fun delegateProgress(portion: Double): JobScope = delegateProgress(1.0, portion) - private inner class DelegateScope(val progressStart: Double, val portion: Double) : JobScope { + private class DelegateScope(val parent: JobImpl, val progressStart: Double, val portion: Double) : JobScope { override val elapsedTime: Long - get() = this@JobImpl.elapsedTime + get() = parent.elapsedTime override suspend fun markSuspensionPoint() = - this@JobImpl.markSuspensionPoint() + parent.markSuspensionPoint() override val progress: Double - get() = (this@JobImpl.progress - progressStart) / portion + get() = (parent.progress - progressStart) / portion override fun setProgress(progress: Double) = - this@JobImpl.setProgress(progressStart + progress * portion) + parent.setProgress(progressStart + progress * portion) override fun delegateProgress(portion: Double): JobScope = - this@JobImpl.delegateProgress(this.portion, portion) + parent.delegateProgress(this.portion, portion) } } |