summaryrefslogtreecommitdiff
path: root/src/main/kotlin/io/dico/parcels2/JobDispatcher.kt
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/JobDispatcher.kt')
-rw-r--r--src/main/kotlin/io/dico/parcels2/JobDispatcher.kt61
1 files changed, 46 insertions, 15 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/JobDispatcher.kt b/src/main/kotlin/io/dico/parcels2/JobDispatcher.kt
index 10da0da..ebbe334 100644
--- a/src/main/kotlin/io/dico/parcels2/JobDispatcher.kt
+++ b/src/main/kotlin/io/dico/parcels2/JobDispatcher.kt
@@ -1,6 +1,8 @@
package io.dico.parcels2
+import io.dico.parcels2.util.PluginAware
import io.dico.parcels2.util.math.clampMin
+import io.dico.parcels2.util.scheduleRepeating
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart.LAZY
@@ -74,7 +76,12 @@ interface Job : JobAndScopeMembersUnion {
*
* if [asCompletionListener] is true, [onCompleted] is called with the same [block]
*/
- fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean = true, block: JobUpdateLister): Job
+ fun onProgressUpdate(
+ minDelay: Int,
+ minInterval: Int,
+ asCompletionListener: Boolean = true,
+ block: JobUpdateLister
+ ): Job
/**
* Calls the given [block] when this job completes, with the progress value 1.0.
@@ -138,7 +145,11 @@ interface JobInternal : Job, JobScope {
* There is a configurable maxiumum amount of milliseconds that can be allocated to all jobs together in each server tick
* This object attempts to split that maximum amount of milliseconds equally between all jobs
*/
-class BukkitJobDispatcher(private val plugin: ParcelsPlugin, var options: TickJobtimeOptions) : JobDispatcher {
+class BukkitJobDispatcher(
+ private val plugin: PluginAware,
+ private val scope: CoroutineScope,
+ var options: TickJobtimeOptions
+) : JobDispatcher {
// The currently registered bukkit scheduler task
private var bukkitTask: BukkitTask? = null
// The jobs.
@@ -146,18 +157,18 @@ class BukkitJobDispatcher(private val plugin: ParcelsPlugin, var options: TickJo
override val jobs: List<Job> = _jobs
override fun dispatch(function: JobFunction): Job {
- val job: JobInternal = JobImpl(plugin, function)
+ val job: JobInternal = JobImpl(scope, function)
if (bukkitTask == null) {
val completed = job.resume(options.jobTime.toLong())
if (completed) return job
- bukkitTask = plugin.scheduleRepeating(0, options.tickInterval) { tickCoroutineJobs() }
+ bukkitTask = plugin.scheduleRepeating(options.tickInterval) { tickJobs() }
}
_jobs.addFirst(job)
return job
}
- private fun tickCoroutineJobs() {
+ private fun tickJobs() {
val jobs = _jobs
if (jobs.isEmpty()) return
val tickStartTime = System.currentTimeMillis()
@@ -237,7 +248,12 @@ private class JobImpl(scope: CoroutineScope, task: JobFunction) : JobInternal {
}
}
- override fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean, block: JobUpdateLister): Job {
+ override fun onProgressUpdate(
+ minDelay: Int,
+ minInterval: Int,
+ asCompletionListener: Boolean,
+ block: JobUpdateLister
+ ): Job {
onProgressUpdate?.let { throw IllegalStateException() }
if (asCompletionListener) onCompleted(block)
if (isComplete) return this
@@ -296,7 +312,11 @@ private class JobImpl(scope: CoroutineScope, task: JobFunction) : JobInternal {
if (isStarted) {
continuation?.let {
continuation = null
- it.resume(Unit)
+
+ wrapExternalCall {
+ it.resume(Unit)
+ }
+
return continuation == null
}
return true
@@ -304,34 +324,45 @@ private class JobImpl(scope: CoroutineScope, task: JobFunction) : JobInternal {
isStarted = true
startTimeOrElapsedTime = System.currentTimeMillis()
- coroutine.start()
+
+ wrapExternalCall {
+ coroutine.start()
+ }
return continuation == null
}
+ private inline fun wrapExternalCall(block: () -> Unit) {
+ try {
+ block()
+ } catch (ex: Throwable) {
+ logger.error("Job $coroutine generated an exception", ex)
+ }
+ }
+
override suspend fun awaitCompletion() {
coroutine.join()
}
private fun delegateProgress(curPortion: Double, portion: Double): JobScope =
- DelegateScope(progress, curPortion * (if (portion < 0) 1.0 - progress else portion).clampMin(0.0))
+ DelegateScope(this, progress, curPortion * (if (portion < 0) 1.0 - progress else portion).clampMin(0.0))
override fun delegateProgress(portion: Double): JobScope = delegateProgress(1.0, portion)
- private inner class DelegateScope(val progressStart: Double, val portion: Double) : JobScope {
+ private class DelegateScope(val parent: JobImpl, val progressStart: Double, val portion: Double) : JobScope {
override val elapsedTime: Long
- get() = this@JobImpl.elapsedTime
+ get() = parent.elapsedTime
override suspend fun markSuspensionPoint() =
- this@JobImpl.markSuspensionPoint()
+ parent.markSuspensionPoint()
override val progress: Double
- get() = (this@JobImpl.progress - progressStart) / portion
+ get() = (parent.progress - progressStart) / portion
override fun setProgress(progress: Double) =
- this@JobImpl.setProgress(progressStart + progress * portion)
+ parent.setProgress(progressStart + progress * portion)
override fun delegateProgress(portion: Double): JobScope =
- this@JobImpl.delegateProgress(this.portion, portion)
+ parent.delegateProgress(this.portion, portion)
}
}