diff options
author | Dico Karssiens <dico.karssiens@gmail.com> | 2018-07-30 13:35:45 +0100 |
---|---|---|
committer | Dico Karssiens <dico.karssiens@gmail.com> | 2018-07-30 13:35:45 +0100 |
commit | ee287253d6e29e8fa30c82674337ca4a962bb1d7 (patch) | |
tree | 12d0748706edd0b7bf07599f328d17b65d2443e9 | |
parent | 72c82371b1c5fa41ae96093d3929c7244ce4bcdc (diff) |
Improve WorktimeLimiter api
3 files changed, 152 insertions, 150 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/WorldGenerator.kt b/src/main/kotlin/io/dico/parcels2/WorldGenerator.kt index e61788c..724eba7 100644 --- a/src/main/kotlin/io/dico/parcels2/WorldGenerator.kt +++ b/src/main/kotlin/io/dico/parcels2/WorldGenerator.kt @@ -1,6 +1,6 @@ package io.dico.parcels2 -import io.dico.parcels2.blockvisitor.JobData +import io.dico.parcels2.blockvisitor.Worker import io.dico.parcels2.blockvisitor.RegionTraversal import io.dico.parcels2.util.* import org.bukkit.* @@ -50,7 +50,7 @@ abstract class ParcelGenerator : ChunkGenerator(), ParcelProvider { abstract fun getBlocks(parcel: Parcel, yRange: IntRange = 0..255): Iterator<Block> - abstract fun clearParcel(parcel: Parcel): JobData + abstract fun clearParcel(parcel: Parcel): Worker } diff --git a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt index 08521fc..0eca6c9 100644 --- a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt +++ b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt @@ -1,229 +1,235 @@ package io.dico.parcels2.blockvisitor -import io.dico.parcels2.Options -import kotlinx.coroutines.experimental.CoroutineStart -import kotlinx.coroutines.experimental.Job -import kotlinx.coroutines.experimental.asCoroutineDispatcher -import kotlinx.coroutines.experimental.launch +import kotlinx.coroutines.experimental.* import org.bukkit.plugin.Plugin import org.bukkit.scheduler.BukkitTask +import java.lang.System.currentTimeMillis import java.util.* import java.util.concurrent.Executor import kotlin.coroutines.experimental.Continuation -import kotlin.coroutines.experimental.ContinuationInterceptor import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED import kotlin.coroutines.experimental.intrinsics.suspendCoroutineUninterceptedOrReturn -interface WorktimeLimiter { +typealias TimeLimitedTask = suspend WorkerScope.() -> Unit +typealias WorkerUpdateLister = Worker.(Double, Long) -> Unit + +data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int) + +sealed class WorktimeLimiter { /** - * Submit a task that should be run synchronously, but limited such that it does not stall the server + * Submit a [task] that should be run synchronously, but limited such that it does not stall the server * a bunch */ - fun submit(job: TimeLimitedTask): JobData + abstract fun submit(task: TimeLimitedTask): Worker /** - * A task should call this frequently during its execution, such that the timer can suspend it when necessary. + * Get a list of all workers */ - suspend fun markSuspensionPoint() + abstract val workers: List<Worker> +} +interface Timed { /** - * A task should call this method to indicate its progress + * The time that elapsed since this worker was dispatched, in milliseconds */ - fun setProgress(progress: Double) + val elapsedTime: Long } -typealias TimeLimitedTask = suspend WorktimeLimiter.() -> Unit - -interface JobData { +interface Worker : Timed { /** - * The coroutine associated with this task, if any + * The coroutine associated with this worker, if any */ val job: Job? /** - * The time that elapsed since this task was dispatched, in milliseconds - */ - val elapsedTime: Long - - /** - * true if this task has completed + * true if this worker has completed */ val isComplete: Boolean /** - * A value indicating the progress of this task, in the range 0.0 <= progress <= 1.0 + * A value indicating the progress of this worker, in the range 0.0 <= progress <= 1.0 * with no guarantees to its accuracy. May be null. */ val progress: Double? /** - * Calls the given [block] whenever the progress is updated, + * Calls the given [block] whenever the progress of this worker is updated, * if [minInterval] milliseconds expired since the last call. * The first call occurs after at least [minDelay] milliseconds in a likewise manner. * Repeated invocations of this method result in an [IllegalStateException] * * if [asCompletionListener] is true, [onCompleted] is called with the same [block] */ - fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean = true, block: JobUpdateListener): JobData + fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean = true, block: WorkerUpdateLister): Worker /** - * Calls the given [block] when this job completes, with the progress value 1.0. + * Calls the given [block] when this worker completes, with the progress value 1.0. * Repeated invocations of this method result in an [IllegalStateException] */ - fun onCompleted(block: JobUpdateListener): JobData + fun onCompleted(block: WorkerUpdateLister): Worker } -typealias JobUpdateListener = JobData.(Double, Long) -> Unit +interface WorkerScope : Timed { + /** + * A task should call this frequently during its execution, such that the timer can suspend it when necessary. + */ + suspend fun markSuspensionPoint() + + /** + * A task should call this method to indicate its progress + */ + fun setProgress(progress: Double) +} -class JobDataImpl(val task: TimeLimitedTask) : JobData { +private interface WorkerContinuation : Worker, WorkerScope { + /** + * Start or resume the execution of this worker + * returns true if the worker completed + */ + fun resume(worktime: Long): Boolean +} - override var job: Job? = null - set(value) { - field?.let { throw IllegalStateException() } - field = value!! - startTimeOrElapsedTime = System.currentTimeMillis() - value.invokeOnCompletion { - startTimeOrElapsedTime = System.currentTimeMillis() - startTimeOrElapsedTime - onCompletedBlock?.invoke(this, 1.0, elapsedTime) - } - } +/** + * An object that controls one or more jobs, ensuring that they don't stall the server too much. + * There is a configurable maxiumum amount of milliseconds that can be allocated to all workers together in each server tick + * This object attempts to split that maximum amount of milliseconds equally between all jobs + */ +class TickWorktimeLimiter(private val plugin: Plugin, var options: TickWorktimeOptions) : WorktimeLimiter() { + // Coroutine dispatcher for jobs + private val dispatcher = Executor(Runnable::run).asCoroutineDispatcher() + // The currently registered bukkit scheduler task + private var bukkitTask: BukkitTask? = null + // The workers. + private var _workers = LinkedList<WorkerContinuation>() + override val workers: List<Worker> = _workers + + override fun submit(task: TimeLimitedTask): Worker { + val worker: WorkerContinuation = WorkerImpl(dispatcher, task) + _workers.addFirst(worker) + if (bukkitTask == null) bukkitTask = plugin.server.scheduler.runTaskTimer(plugin, ::tickJobs, 0, options.tickInterval.toLong()) + return worker + } - // when running: startTime, else: total elapsed time - private var startTimeOrElapsedTime: Long = 0L - override val elapsedTime get() = job?.let { if (it.isCompleted) startTimeOrElapsedTime else System.currentTimeMillis() - startTimeOrElapsedTime } ?: 0L + private fun tickJobs() { + val workers = _workers + if (workers.isEmpty()) return + val tickStartTime = System.currentTimeMillis() - var next: Continuation<Unit>? = null + val iterator = workers.listIterator(index = 0) + while (iterator.hasNext()) { + val time = System.currentTimeMillis() + val timeElapsed = time - tickStartTime + val timeLeft = options.workTime - timeElapsed + if (timeLeft <= 0) return - override var progress: Double? = null - set(value) { - field = value - doProgressUpdate() + val count = iterator.nextIndex() + val timePerJob = (timeLeft + count - 1) / count + val worker = iterator.next() + val completed = worker.resume(timePerJob) + if (completed) { + iterator.remove() + } } - private fun doProgressUpdate() { - val progressUpdate = progressUpdateBlock ?: return - val time = System.currentTimeMillis() - if (time > lastUpdateTime + progressUpdateInterval) { - progressUpdate(progress!!, elapsedTime) - lastUpdateTime = time + if (workers.isEmpty()) { + bukkitTask?.cancel() + bukkitTask = null } } - private var progressUpdateBlock: JobUpdateListener? = null +} + +private class WorkerImpl(val dispatcher: CoroutineDispatcher, + val task: TimeLimitedTask) : WorkerContinuation { + override var job: Job? = null; private set + + override val elapsedTime + get() = job?.let { + if (it.isCompleted) startTimeOrElapsedTime + else currentTimeMillis() - startTimeOrElapsedTime + } ?: 0L + + override val isComplete get() = job?.isCompleted == true + + override var progress: Double? = null; private set + + private var startTimeOrElapsedTime: Long = 0L // startTime before completed, elapsed time otherwise + private var onProgressUpdate: WorkerUpdateLister? = null private var progressUpdateInterval: Int = 0 private var lastUpdateTime: Long = 0L - override fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean, block: JobUpdateListener): JobDataImpl { - progressUpdateBlock?.let { throw IllegalStateException() } - progressUpdateBlock = block + private var onCompleted: WorkerUpdateLister? = null + private var continuation: Continuation<Unit>? = null + private var nextSuspensionTime: Long = 0L + + private fun initJob(job: Job) { + this.job?.let { throw IllegalStateException() } + this.job = job + startTimeOrElapsedTime = System.currentTimeMillis() + job.invokeOnCompletion { + // convert to elapsed time here + startTimeOrElapsedTime = System.currentTimeMillis() - startTimeOrElapsedTime + onCompleted?.let { it(1.0, elapsedTime) } + } + } + + override fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean, block: WorkerUpdateLister): Worker { + onProgressUpdate?.let { throw IllegalStateException() } + onProgressUpdate = block progressUpdateInterval = minInterval lastUpdateTime = System.currentTimeMillis() + minDelay - minInterval if (asCompletionListener) onCompleted(block) return this } - override val isComplete get() = job?.isCompleted == true - private var onCompletedBlock: JobUpdateListener? = null - override fun onCompleted(block: JobUpdateListener): JobDataImpl { - onCompletedBlock?.let { throw IllegalStateException() } - onCompletedBlock = block + override fun onCompleted(block: WorkerUpdateLister): Worker { + onCompleted?.let { throw IllegalStateException() } + onCompleted = block return this } -} - -/** - * An object that controls one or more jobs, ensuring that they don't stall the server too much. - * The amount of milliseconds that can accumulate each server tick is configurable - */ -class TickWorktimeLimiter(private val plugin: Plugin, private val optionsRoot: Options) : WorktimeLimiter { - // Coroutine dispatcher for jobs - private val dispatcher = Executor(Runnable::run).asCoroutineDispatcher() - // union of Continuation<Unit> and suspend WorktimeLimited.() -> Unit - private var jobs = LinkedList<JobDataImpl>() - // The currently registered bukkit scheduler task - private var task: BukkitTask? = null - // The data associated with the task that is currently being executed - private var curJobData: JobDataImpl? = null - // Used to keep track of when the current task should end - private var curJobEndTime = 0L - // Tick work time options - private inline val options get() = optionsRoot.tickWorktime - - override fun submit(job: TimeLimitedTask): JobData { - val jobData = JobDataImpl(job) - jobs.addFirst(jobData) - if (task == null) task = plugin.server.scheduler.runTaskTimer(plugin, ::tickJobs, 0, options.tickInterval.toLong()) - return jobData - } - override suspend fun markSuspensionPoint() { - if (System.currentTimeMillis() >= curJobEndTime) - suspendCoroutineUninterceptedOrReturn(::scheduleContinuation) + if (System.currentTimeMillis() >= nextSuspensionTime) + suspendCoroutineUninterceptedOrReturn { cont: Continuation<Unit> -> + continuation = cont + COROUTINE_SUSPENDED + } } override fun setProgress(progress: Double) { - curJobData!!.progress = progress + this.progress = progress + val onProgressUpdate = onProgressUpdate ?: return + val time = System.currentTimeMillis() + if (time > lastUpdateTime + progressUpdateInterval) { + onProgressUpdate(progress, elapsedTime) + lastUpdateTime = time + } } - private fun tickJobs() { - if (jobs.isEmpty()) return - val tickStartTime = System.currentTimeMillis() - val jobs = this.jobs; this.jobs = LinkedList() - - var count = jobs.size - - while (!jobs.isEmpty()) { - val job = jobs.poll() - val time = System.currentTimeMillis() - val timeElapsed = time - tickStartTime - val timeLeft = options.workTime - timeElapsed - - if (timeLeft <= 0) { - this.jobs.addAll(0, jobs) - return - } + override fun resume(worktime: Long): Boolean { + nextSuspensionTime = currentTimeMillis() + worktime - val timePerJob = (timeLeft + count - 1) / count - tickJob(job, time + timePerJob) - count-- + continuation?.let { + continuation = null + it.resume(Unit) + return continuation == null } - if (jobs.isEmpty() && this.jobs.isEmpty()) { - task?.cancel() - task = null + job?.let { + nextSuspensionTime = 0L + throw IllegalStateException() } - } - @Suppress("UNCHECKED_CAST") - private fun tickJob(job: JobDataImpl, endTime: Long) { - curJobData = job - curJobEndTime = endTime - try { - val next = job.next - if (next == null) startJob(job) - else next.resume(Unit) - } - finally { - curJobData = null - curJobEndTime = 0L + launch(context = dispatcher, start = CoroutineStart.UNDISPATCHED) { + initJob(job = kotlin.coroutines.experimental.coroutineContext[Job]!!) + task() } - } - - private fun startJob(job: JobDataImpl) { - job.job = launch(context = dispatcher, start = CoroutineStart.UNDISPATCHED) { job.task(this@TickWorktimeLimiter) } - } - private fun scheduleContinuation(continuation: Continuation<Unit>): Any? { - curJobData!!.next = continuation - jobs.addLast(curJobData) - return COROUTINE_SUSPENDED + return continuation == null } } -data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int) - - +/* /** * While the implementation of [kotlin.coroutines.experimental.intrinsics.intercepted] is intrinsic, it should look something like this * We don't care for intercepting the coroutine as we want it to resume immediately when we call resume(). @@ -231,4 +237,5 @@ data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int) */ private fun <T> Continuation<T>.interceptedImpl(): Continuation<T> { return context[ContinuationInterceptor]?.interceptContinuation(this) ?: this -}
\ No newline at end of file +} + */ diff --git a/src/main/kotlin/io/dico/parcels2/command/CommandsGeneral.kt b/src/main/kotlin/io/dico/parcels2/command/CommandsGeneral.kt index 2de847c..c3f8836 100644 --- a/src/main/kotlin/io/dico/parcels2/command/CommandsGeneral.kt +++ b/src/main/kotlin/io/dico/parcels2/command/CommandsGeneral.kt @@ -7,13 +7,11 @@ import io.dico.dicore.command.annotation.Desc import io.dico.dicore.command.annotation.RequireParameters import io.dico.parcels2.ParcelOwner import io.dico.parcels2.ParcelsPlugin -import io.dico.parcels2.blockvisitor.JobUpdateListener import io.dico.parcels2.command.NamedParcelDefaultValue.FIRST_OWNED import io.dico.parcels2.storage.getParcelBySerializedValue import io.dico.parcels2.util.hasAdminManage import io.dico.parcels2.util.hasParcelHomeOthers import io.dico.parcels2.util.uuid -import kotlinx.coroutines.experimental.Job import org.bukkit.entity.Player //@Suppress("unused") @@ -84,11 +82,8 @@ class CommandsGeneral(plugin: ParcelsPlugin) : AbstractParcelCommands(plugin) { @Cmd("clear") @ParcelRequire(owner = true) fun ParcelScope.cmdClear(player: Player, context: ExecutionContext) { - val onProgressUpdate: JobUpdateListener = { progress, elapsedTime -> - context.sendMessage("[Clearing] Progress: %.06f%%".format(progress * 100)) - } world.generator.clearParcel(parcel) - .onProgressUpdate(5, 5) { progress, elapsedTime -> + .onProgressUpdate(1000, 1000) { progress, elapsedTime -> context.sendMessage(EMessageType.INFORMATIVE, "Clear progress: %.06f%%, %.2fs elapsed" .format(progress * 100, elapsedTime / 1000.0)) } |