summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDico Karssiens <dico.karssiens@gmail.com>2018-07-30 13:35:45 +0100
committerDico Karssiens <dico.karssiens@gmail.com>2018-07-30 13:35:45 +0100
commitee287253d6e29e8fa30c82674337ca4a962bb1d7 (patch)
tree12d0748706edd0b7bf07599f328d17b65d2443e9
parent72c82371b1c5fa41ae96093d3929c7244ce4bcdc (diff)
Improve WorktimeLimiter api
-rw-r--r--src/main/kotlin/io/dico/parcels2/WorldGenerator.kt4
-rw-r--r--src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt291
-rw-r--r--src/main/kotlin/io/dico/parcels2/command/CommandsGeneral.kt7
3 files changed, 152 insertions, 150 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/WorldGenerator.kt b/src/main/kotlin/io/dico/parcels2/WorldGenerator.kt
index e61788c..724eba7 100644
--- a/src/main/kotlin/io/dico/parcels2/WorldGenerator.kt
+++ b/src/main/kotlin/io/dico/parcels2/WorldGenerator.kt
@@ -1,6 +1,6 @@
package io.dico.parcels2
-import io.dico.parcels2.blockvisitor.JobData
+import io.dico.parcels2.blockvisitor.Worker
import io.dico.parcels2.blockvisitor.RegionTraversal
import io.dico.parcels2.util.*
import org.bukkit.*
@@ -50,7 +50,7 @@ abstract class ParcelGenerator : ChunkGenerator(), ParcelProvider {
abstract fun getBlocks(parcel: Parcel, yRange: IntRange = 0..255): Iterator<Block>
- abstract fun clearParcel(parcel: Parcel): JobData
+ abstract fun clearParcel(parcel: Parcel): Worker
}
diff --git a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
index 08521fc..0eca6c9 100644
--- a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
+++ b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt
@@ -1,229 +1,235 @@
package io.dico.parcels2.blockvisitor
-import io.dico.parcels2.Options
-import kotlinx.coroutines.experimental.CoroutineStart
-import kotlinx.coroutines.experimental.Job
-import kotlinx.coroutines.experimental.asCoroutineDispatcher
-import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.*
import org.bukkit.plugin.Plugin
import org.bukkit.scheduler.BukkitTask
+import java.lang.System.currentTimeMillis
import java.util.*
import java.util.concurrent.Executor
import kotlin.coroutines.experimental.Continuation
-import kotlin.coroutines.experimental.ContinuationInterceptor
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineUninterceptedOrReturn
-interface WorktimeLimiter {
+typealias TimeLimitedTask = suspend WorkerScope.() -> Unit
+typealias WorkerUpdateLister = Worker.(Double, Long) -> Unit
+
+data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int)
+
+sealed class WorktimeLimiter {
/**
- * Submit a task that should be run synchronously, but limited such that it does not stall the server
+ * Submit a [task] that should be run synchronously, but limited such that it does not stall the server
* a bunch
*/
- fun submit(job: TimeLimitedTask): JobData
+ abstract fun submit(task: TimeLimitedTask): Worker
/**
- * A task should call this frequently during its execution, such that the timer can suspend it when necessary.
+ * Get a list of all workers
*/
- suspend fun markSuspensionPoint()
+ abstract val workers: List<Worker>
+}
+interface Timed {
/**
- * A task should call this method to indicate its progress
+ * The time that elapsed since this worker was dispatched, in milliseconds
*/
- fun setProgress(progress: Double)
+ val elapsedTime: Long
}
-typealias TimeLimitedTask = suspend WorktimeLimiter.() -> Unit
-
-interface JobData {
+interface Worker : Timed {
/**
- * The coroutine associated with this task, if any
+ * The coroutine associated with this worker, if any
*/
val job: Job?
/**
- * The time that elapsed since this task was dispatched, in milliseconds
- */
- val elapsedTime: Long
-
- /**
- * true if this task has completed
+ * true if this worker has completed
*/
val isComplete: Boolean
/**
- * A value indicating the progress of this task, in the range 0.0 <= progress <= 1.0
+ * A value indicating the progress of this worker, in the range 0.0 <= progress <= 1.0
* with no guarantees to its accuracy. May be null.
*/
val progress: Double?
/**
- * Calls the given [block] whenever the progress is updated,
+ * Calls the given [block] whenever the progress of this worker is updated,
* if [minInterval] milliseconds expired since the last call.
* The first call occurs after at least [minDelay] milliseconds in a likewise manner.
* Repeated invocations of this method result in an [IllegalStateException]
*
* if [asCompletionListener] is true, [onCompleted] is called with the same [block]
*/
- fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean = true, block: JobUpdateListener): JobData
+ fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean = true, block: WorkerUpdateLister): Worker
/**
- * Calls the given [block] when this job completes, with the progress value 1.0.
+ * Calls the given [block] when this worker completes, with the progress value 1.0.
* Repeated invocations of this method result in an [IllegalStateException]
*/
- fun onCompleted(block: JobUpdateListener): JobData
+ fun onCompleted(block: WorkerUpdateLister): Worker
}
-typealias JobUpdateListener = JobData.(Double, Long) -> Unit
+interface WorkerScope : Timed {
+ /**
+ * A task should call this frequently during its execution, such that the timer can suspend it when necessary.
+ */
+ suspend fun markSuspensionPoint()
+
+ /**
+ * A task should call this method to indicate its progress
+ */
+ fun setProgress(progress: Double)
+}
-class JobDataImpl(val task: TimeLimitedTask) : JobData {
+private interface WorkerContinuation : Worker, WorkerScope {
+ /**
+ * Start or resume the execution of this worker
+ * returns true if the worker completed
+ */
+ fun resume(worktime: Long): Boolean
+}
- override var job: Job? = null
- set(value) {
- field?.let { throw IllegalStateException() }
- field = value!!
- startTimeOrElapsedTime = System.currentTimeMillis()
- value.invokeOnCompletion {
- startTimeOrElapsedTime = System.currentTimeMillis() - startTimeOrElapsedTime
- onCompletedBlock?.invoke(this, 1.0, elapsedTime)
- }
- }
+/**
+ * An object that controls one or more jobs, ensuring that they don't stall the server too much.
+ * There is a configurable maxiumum amount of milliseconds that can be allocated to all workers together in each server tick
+ * This object attempts to split that maximum amount of milliseconds equally between all jobs
+ */
+class TickWorktimeLimiter(private val plugin: Plugin, var options: TickWorktimeOptions) : WorktimeLimiter() {
+ // Coroutine dispatcher for jobs
+ private val dispatcher = Executor(Runnable::run).asCoroutineDispatcher()
+ // The currently registered bukkit scheduler task
+ private var bukkitTask: BukkitTask? = null
+ // The workers.
+ private var _workers = LinkedList<WorkerContinuation>()
+ override val workers: List<Worker> = _workers
+
+ override fun submit(task: TimeLimitedTask): Worker {
+ val worker: WorkerContinuation = WorkerImpl(dispatcher, task)
+ _workers.addFirst(worker)
+ if (bukkitTask == null) bukkitTask = plugin.server.scheduler.runTaskTimer(plugin, ::tickJobs, 0, options.tickInterval.toLong())
+ return worker
+ }
- // when running: startTime, else: total elapsed time
- private var startTimeOrElapsedTime: Long = 0L
- override val elapsedTime get() = job?.let { if (it.isCompleted) startTimeOrElapsedTime else System.currentTimeMillis() - startTimeOrElapsedTime } ?: 0L
+ private fun tickJobs() {
+ val workers = _workers
+ if (workers.isEmpty()) return
+ val tickStartTime = System.currentTimeMillis()
- var next: Continuation<Unit>? = null
+ val iterator = workers.listIterator(index = 0)
+ while (iterator.hasNext()) {
+ val time = System.currentTimeMillis()
+ val timeElapsed = time - tickStartTime
+ val timeLeft = options.workTime - timeElapsed
+ if (timeLeft <= 0) return
- override var progress: Double? = null
- set(value) {
- field = value
- doProgressUpdate()
+ val count = iterator.nextIndex()
+ val timePerJob = (timeLeft + count - 1) / count
+ val worker = iterator.next()
+ val completed = worker.resume(timePerJob)
+ if (completed) {
+ iterator.remove()
+ }
}
- private fun doProgressUpdate() {
- val progressUpdate = progressUpdateBlock ?: return
- val time = System.currentTimeMillis()
- if (time > lastUpdateTime + progressUpdateInterval) {
- progressUpdate(progress!!, elapsedTime)
- lastUpdateTime = time
+ if (workers.isEmpty()) {
+ bukkitTask?.cancel()
+ bukkitTask = null
}
}
- private var progressUpdateBlock: JobUpdateListener? = null
+}
+
+private class WorkerImpl(val dispatcher: CoroutineDispatcher,
+ val task: TimeLimitedTask) : WorkerContinuation {
+ override var job: Job? = null; private set
+
+ override val elapsedTime
+ get() = job?.let {
+ if (it.isCompleted) startTimeOrElapsedTime
+ else currentTimeMillis() - startTimeOrElapsedTime
+ } ?: 0L
+
+ override val isComplete get() = job?.isCompleted == true
+
+ override var progress: Double? = null; private set
+
+ private var startTimeOrElapsedTime: Long = 0L // startTime before completed, elapsed time otherwise
+ private var onProgressUpdate: WorkerUpdateLister? = null
private var progressUpdateInterval: Int = 0
private var lastUpdateTime: Long = 0L
- override fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean, block: JobUpdateListener): JobDataImpl {
- progressUpdateBlock?.let { throw IllegalStateException() }
- progressUpdateBlock = block
+ private var onCompleted: WorkerUpdateLister? = null
+ private var continuation: Continuation<Unit>? = null
+ private var nextSuspensionTime: Long = 0L
+
+ private fun initJob(job: Job) {
+ this.job?.let { throw IllegalStateException() }
+ this.job = job
+ startTimeOrElapsedTime = System.currentTimeMillis()
+ job.invokeOnCompletion {
+ // convert to elapsed time here
+ startTimeOrElapsedTime = System.currentTimeMillis() - startTimeOrElapsedTime
+ onCompleted?.let { it(1.0, elapsedTime) }
+ }
+ }
+
+ override fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean, block: WorkerUpdateLister): Worker {
+ onProgressUpdate?.let { throw IllegalStateException() }
+ onProgressUpdate = block
progressUpdateInterval = minInterval
lastUpdateTime = System.currentTimeMillis() + minDelay - minInterval
if (asCompletionListener) onCompleted(block)
return this
}
- override val isComplete get() = job?.isCompleted == true
- private var onCompletedBlock: JobUpdateListener? = null
- override fun onCompleted(block: JobUpdateListener): JobDataImpl {
- onCompletedBlock?.let { throw IllegalStateException() }
- onCompletedBlock = block
+ override fun onCompleted(block: WorkerUpdateLister): Worker {
+ onCompleted?.let { throw IllegalStateException() }
+ onCompleted = block
return this
}
-}
-
-/**
- * An object that controls one or more jobs, ensuring that they don't stall the server too much.
- * The amount of milliseconds that can accumulate each server tick is configurable
- */
-class TickWorktimeLimiter(private val plugin: Plugin, private val optionsRoot: Options) : WorktimeLimiter {
- // Coroutine dispatcher for jobs
- private val dispatcher = Executor(Runnable::run).asCoroutineDispatcher()
- // union of Continuation<Unit> and suspend WorktimeLimited.() -> Unit
- private var jobs = LinkedList<JobDataImpl>()
- // The currently registered bukkit scheduler task
- private var task: BukkitTask? = null
- // The data associated with the task that is currently being executed
- private var curJobData: JobDataImpl? = null
- // Used to keep track of when the current task should end
- private var curJobEndTime = 0L
- // Tick work time options
- private inline val options get() = optionsRoot.tickWorktime
-
- override fun submit(job: TimeLimitedTask): JobData {
- val jobData = JobDataImpl(job)
- jobs.addFirst(jobData)
- if (task == null) task = plugin.server.scheduler.runTaskTimer(plugin, ::tickJobs, 0, options.tickInterval.toLong())
- return jobData
- }
-
override suspend fun markSuspensionPoint() {
- if (System.currentTimeMillis() >= curJobEndTime)
- suspendCoroutineUninterceptedOrReturn(::scheduleContinuation)
+ if (System.currentTimeMillis() >= nextSuspensionTime)
+ suspendCoroutineUninterceptedOrReturn { cont: Continuation<Unit> ->
+ continuation = cont
+ COROUTINE_SUSPENDED
+ }
}
override fun setProgress(progress: Double) {
- curJobData!!.progress = progress
+ this.progress = progress
+ val onProgressUpdate = onProgressUpdate ?: return
+ val time = System.currentTimeMillis()
+ if (time > lastUpdateTime + progressUpdateInterval) {
+ onProgressUpdate(progress, elapsedTime)
+ lastUpdateTime = time
+ }
}
- private fun tickJobs() {
- if (jobs.isEmpty()) return
- val tickStartTime = System.currentTimeMillis()
- val jobs = this.jobs; this.jobs = LinkedList()
-
- var count = jobs.size
-
- while (!jobs.isEmpty()) {
- val job = jobs.poll()
- val time = System.currentTimeMillis()
- val timeElapsed = time - tickStartTime
- val timeLeft = options.workTime - timeElapsed
-
- if (timeLeft <= 0) {
- this.jobs.addAll(0, jobs)
- return
- }
+ override fun resume(worktime: Long): Boolean {
+ nextSuspensionTime = currentTimeMillis() + worktime
- val timePerJob = (timeLeft + count - 1) / count
- tickJob(job, time + timePerJob)
- count--
+ continuation?.let {
+ continuation = null
+ it.resume(Unit)
+ return continuation == null
}
- if (jobs.isEmpty() && this.jobs.isEmpty()) {
- task?.cancel()
- task = null
+ job?.let {
+ nextSuspensionTime = 0L
+ throw IllegalStateException()
}
- }
- @Suppress("UNCHECKED_CAST")
- private fun tickJob(job: JobDataImpl, endTime: Long) {
- curJobData = job
- curJobEndTime = endTime
- try {
- val next = job.next
- if (next == null) startJob(job)
- else next.resume(Unit)
- }
- finally {
- curJobData = null
- curJobEndTime = 0L
+ launch(context = dispatcher, start = CoroutineStart.UNDISPATCHED) {
+ initJob(job = kotlin.coroutines.experimental.coroutineContext[Job]!!)
+ task()
}
- }
-
- private fun startJob(job: JobDataImpl) {
- job.job = launch(context = dispatcher, start = CoroutineStart.UNDISPATCHED) { job.task(this@TickWorktimeLimiter) }
- }
- private fun scheduleContinuation(continuation: Continuation<Unit>): Any? {
- curJobData!!.next = continuation
- jobs.addLast(curJobData)
- return COROUTINE_SUSPENDED
+ return continuation == null
}
}
-data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int)
-
-
+/*
/**
* While the implementation of [kotlin.coroutines.experimental.intrinsics.intercepted] is intrinsic, it should look something like this
* We don't care for intercepting the coroutine as we want it to resume immediately when we call resume().
@@ -231,4 +237,5 @@ data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int)
*/
private fun <T> Continuation<T>.interceptedImpl(): Continuation<T> {
return context[ContinuationInterceptor]?.interceptContinuation(this) ?: this
-} \ No newline at end of file
+}
+ */
diff --git a/src/main/kotlin/io/dico/parcels2/command/CommandsGeneral.kt b/src/main/kotlin/io/dico/parcels2/command/CommandsGeneral.kt
index 2de847c..c3f8836 100644
--- a/src/main/kotlin/io/dico/parcels2/command/CommandsGeneral.kt
+++ b/src/main/kotlin/io/dico/parcels2/command/CommandsGeneral.kt
@@ -7,13 +7,11 @@ import io.dico.dicore.command.annotation.Desc
import io.dico.dicore.command.annotation.RequireParameters
import io.dico.parcels2.ParcelOwner
import io.dico.parcels2.ParcelsPlugin
-import io.dico.parcels2.blockvisitor.JobUpdateListener
import io.dico.parcels2.command.NamedParcelDefaultValue.FIRST_OWNED
import io.dico.parcels2.storage.getParcelBySerializedValue
import io.dico.parcels2.util.hasAdminManage
import io.dico.parcels2.util.hasParcelHomeOthers
import io.dico.parcels2.util.uuid
-import kotlinx.coroutines.experimental.Job
import org.bukkit.entity.Player
//@Suppress("unused")
@@ -84,11 +82,8 @@ class CommandsGeneral(plugin: ParcelsPlugin) : AbstractParcelCommands(plugin) {
@Cmd("clear")
@ParcelRequire(owner = true)
fun ParcelScope.cmdClear(player: Player, context: ExecutionContext) {
- val onProgressUpdate: JobUpdateListener = { progress, elapsedTime ->
- context.sendMessage("[Clearing] Progress: %.06f%%".format(progress * 100))
- }
world.generator.clearParcel(parcel)
- .onProgressUpdate(5, 5) { progress, elapsedTime ->
+ .onProgressUpdate(1000, 1000) { progress, elapsedTime ->
context.sendMessage(EMessageType.INFORMATIVE, "Clear progress: %.06f%%, %.2fs elapsed"
.format(progress * 100, elapsedTime / 1000.0))
}