diff options
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/storage')
8 files changed, 258 insertions, 121 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/storage/Backing.kt b/src/main/kotlin/io/dico/parcels2/storage/Backing.kt index 88ee5fd..bb4cf33 100644 --- a/src/main/kotlin/io/dico/parcels2/storage/Backing.kt +++ b/src/main/kotlin/io/dico/parcels2/storage/Backing.kt @@ -1,6 +1,12 @@ package io.dico.parcels2.storage import io.dico.parcels2.* +import kotlinx.coroutines.experimental.CoroutineDispatcher +import kotlinx.coroutines.experimental.CoroutineScope +import kotlinx.coroutines.experimental.Deferred +import kotlinx.coroutines.experimental.Job +import kotlinx.coroutines.experimental.channels.ProducerScope +import kotlinx.coroutines.experimental.channels.ReceiveChannel import kotlinx.coroutines.experimental.channels.SendChannel import java.util.UUID @@ -10,41 +16,58 @@ interface Backing { val isConnected: Boolean - suspend fun init() + fun launchJob(job: Backing.() -> Unit): Job - suspend fun shutdown() + fun <T> launchFuture(future: Backing.() -> T): Deferred<T> + + fun <T> openChannel(future: Backing.(SendChannel<T>) -> Unit): ReceiveChannel<T> + + + fun init() + + fun shutdown() /** * This producer function is capable of constantly reading parcels from a potentially infinite sequence, * and provide parcel data for it as read from the database. */ - suspend fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) + fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) + + fun produceAllParcelData(channel: SendChannel<DataPair>) + + fun readParcelData(parcel: ParcelId): ParcelData? + + fun getOwnedParcels(user: ParcelOwner): List<ParcelId> + + fun getNumParcels(user: ParcelOwner): Int = getOwnedParcels(user).size + - suspend fun produceAllParcelData(channel: SendChannel<DataPair>) + fun setParcelData(parcel: ParcelId, data: ParcelData?) - suspend fun readParcelData(parcel: ParcelId): ParcelData? + fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) - suspend fun getOwnedParcels(user: ParcelOwner): List<ParcelId> + fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) - suspend fun getNumParcels(user: ParcelOwner): Int = getOwnedParcels(user).size + fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean) + fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean) - suspend fun setParcelData(parcel: ParcelId, data: ParcelData?) - suspend fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) + fun produceAllGlobalAddedData(channel: SendChannel<AddedDataPair<ParcelOwner>>) - suspend fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) + fun readGlobalAddedData(owner: ParcelOwner): MutableAddedDataMap - suspend fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean) + fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) - suspend fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean) +} +abstract class AbstractBacking(val dispatcher: CoroutineDispatcher) { - suspend fun produceAllGlobalAddedData(channel: SendChannel<AddedDataPair<ParcelOwner>>) + fun launchJob(job: Backing.() -> Unit): Job - suspend fun readGlobalAddedData(owner: ParcelOwner): MutableAddedDataMap + fun <T> launchFuture(future: Backing.() -> T): Deferred<T> - suspend fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) + fun <T> openChannel(future: Backing.(SendChannel<T>) -> Unit): ReceiveChannel<T> -}
\ No newline at end of file +} diff --git a/src/main/kotlin/io/dico/parcels2/storage/Storage.kt b/src/main/kotlin/io/dico/parcels2/storage/Storage.kt index 6c3d68f..6770d99 100644 --- a/src/main/kotlin/io/dico/parcels2/storage/Storage.kt +++ b/src/main/kotlin/io/dico/parcels2/storage/Storage.kt @@ -3,21 +3,16 @@ package io.dico.parcels2.storage import io.dico.parcels2.* -import kotlinx.coroutines.experimental.* -import kotlinx.coroutines.experimental.channels.ProducerScope +import kotlinx.coroutines.experimental.Deferred +import kotlinx.coroutines.experimental.Job import kotlinx.coroutines.experimental.channels.ReceiveChannel -import kotlinx.coroutines.experimental.channels.produce import java.util.UUID -import java.util.concurrent.Executor -import java.util.concurrent.Executors typealias DataPair = Pair<ParcelId, ParcelData?> typealias AddedDataPair<TAttach> = Pair<TAttach, MutableAddedDataMap> interface Storage { val name: String - val syncDispatcher: CoroutineDispatcher - val asyncDispatcher: CoroutineDispatcher val isConnected: Boolean fun init(): Job @@ -54,55 +49,39 @@ interface Storage { fun setGlobalAddedStatus(owner: ParcelOwner, player: UUID, status: AddedStatus): Job } -class StorageWithCoroutineBacking internal constructor(val backing: Backing) : Storage { - override val name get() = backing.name - override val syncDispatcher = Executor { it.run() }.asCoroutineDispatcher() - val poolSize: Int get() = 4 - override val asyncDispatcher = Executors.newFixedThreadPool(poolSize) { Thread(it, "Parcels2_StorageThread") }.asCoroutineDispatcher() - override val isConnected get() = backing.isConnected - val channelCapacity = 16 +class BackedStorage internal constructor(val b: Backing) : Storage { + override val name get() = b.name + override val isConnected get() = b.isConnected - private inline fun <T> defer(noinline block: suspend CoroutineScope.() -> T): Deferred<T> { - return async(context = asyncDispatcher, start = CoroutineStart.ATOMIC, block = block) - } + override fun init() = b.launchJob { init() } - private inline fun job(noinline block: suspend CoroutineScope.() -> Unit): Job { - return launch(context = asyncDispatcher, start = CoroutineStart.ATOMIC, block = block) - } + override fun shutdown() = b.launchJob { shutdown() } - private inline fun <T> openChannel(noinline block: suspend ProducerScope<T>.() -> Unit): ReceiveChannel<T> { - return produce(asyncDispatcher, capacity = channelCapacity, block = block) - } - override fun init() = job { backing.init() } + override fun readParcelData(parcel: ParcelId) = b.launchFuture { readParcelData(parcel) } - override fun shutdown() = job { backing.shutdown() } + override fun readParcelData(parcels: Sequence<ParcelId>) = b.openChannel<DataPair> { produceParcelData(it, parcels) } + override fun readAllParcelData() = b.openChannel<DataPair> { produceAllParcelData(it) } - override fun readParcelData(parcel: ParcelId) = defer { backing.readParcelData(parcel) } + override fun getOwnedParcels(user: ParcelOwner) = b.launchFuture { getOwnedParcels(user) } - override fun readParcelData(parcels: Sequence<ParcelId>) = openChannel<DataPair> { backing.produceParcelData(channel, parcels) } + override fun getNumParcels(user: ParcelOwner) = b.launchFuture { getNumParcels(user) } - override fun readAllParcelData() = openChannel<DataPair> { backing.produceAllParcelData(channel) } + override fun setParcelData(parcel: ParcelId, data: ParcelData?) = b.launchJob { setParcelData(parcel, data) } - override fun getOwnedParcels(user: ParcelOwner) = defer { backing.getOwnedParcels(user) } + override fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = b.launchJob { setParcelOwner(parcel, owner) } - override fun getNumParcels(user: ParcelOwner) = defer { backing.getNumParcels(user) } + override fun setParcelPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = b.launchJob { setLocalPlayerStatus(parcel, player, status) } - override fun setParcelData(parcel: ParcelId, data: ParcelData?) = job { backing.setParcelData(parcel, data) } + override fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean) = b.launchJob { setParcelAllowsInteractInventory(parcel, value) } - override fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = job { backing.setParcelOwner(parcel, owner) } + override fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean) = b.launchJob { setParcelAllowsInteractInputs(parcel, value) } - override fun setParcelPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = job { backing.setLocalPlayerStatus(parcel, player, status) } - override fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean) = job { backing.setParcelAllowsInteractInventory(parcel, value) } + override fun readAllGlobalAddedData(): ReceiveChannel<AddedDataPair<ParcelOwner>> = b.openChannel { produceAllGlobalAddedData(it) } - override fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean) = job { backing.setParcelAllowsInteractInputs(parcel, value) } + override fun readGlobalAddedData(owner: ParcelOwner): Deferred<MutableAddedDataMap?> = b.launchFuture { readGlobalAddedData(owner) } - - override fun readAllGlobalAddedData(): ReceiveChannel<AddedDataPair<ParcelOwner>> = openChannel { backing.produceAllGlobalAddedData(channel) } - - override fun readGlobalAddedData(owner: ParcelOwner): Deferred<MutableAddedDataMap?> = defer { backing.readGlobalAddedData(owner) } - - override fun setGlobalAddedStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = job { backing.setGlobalPlayerStatus(owner, player, status) } + override fun setGlobalAddedStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = b.launchJob { setGlobalPlayerStatus(owner, player, status) } } diff --git a/src/main/kotlin/io/dico/parcels2/storage/exposed/CoroutineTransactionManager.kt b/src/main/kotlin/io/dico/parcels2/storage/exposed/CoroutineTransactionManager.kt new file mode 100644 index 0000000..ab707af --- /dev/null +++ b/src/main/kotlin/io/dico/parcels2/storage/exposed/CoroutineTransactionManager.kt @@ -0,0 +1,118 @@ +package io.dico.parcels2.storage.exposed + +import kotlinx.coroutines.experimental.* +import org.jetbrains.exposed.sql.* +import org.jetbrains.exposed.sql.statements.StatementContext +import org.jetbrains.exposed.sql.statements.StatementInterceptor +import org.jetbrains.exposed.sql.statements.expandArgs +import org.jetbrains.exposed.sql.transactions.* +import org.slf4j.LoggerFactory +import java.sql.Connection +import kotlin.coroutines.experimental.CoroutineContext + +fun <T> ctransaction(db: Database? = null, statement: suspend Transaction.() -> T): T { + return ctransaction(TransactionManager.manager.defaultIsolationLevel, 3, db, statement) +} + +fun <T> ctransaction(transactionIsolation: Int, repetitionAttempts: Int, db: Database? = null, statement: suspend Transaction.() -> T): T { + return transaction(transactionIsolation, repetitionAttempts, db) { + if (this !is CoroutineTransaction) throw IllegalStateException("ctransaction requires CoroutineTransactionManager.") + + val job = async(context = manager.context, start = CoroutineStart.UNDISPATCHED) { + this@transaction.statement() + } + + if (job.isActive) { + runBlocking(context = Unconfined) { + job.join() + } + } + + job.getCompleted() + } +} + +class CoroutineTransactionManager(private val db: Database, + dispatcher: CoroutineDispatcher, + override var defaultIsolationLevel: Int = DEFAULT_ISOLATION_LEVEL) : TransactionManager { + val context: CoroutineDispatcher = TransactionCoroutineDispatcher(dispatcher) + private val transaction = ThreadLocal<CoroutineTransaction?>() + + override fun currentOrNull(): Transaction? { + + + return transaction.get() + ?: null + } + + override fun newTransaction(isolation: Int): Transaction { + return CoroutineTransaction(this, CoroutineTransactionInterface(db, isolation, transaction)).also { transaction.set(it) } + } + + private inner class TransactionCoroutineDispatcher(val delegate: CoroutineDispatcher) : CoroutineDispatcher() { + + // When the thread changes, move the transaction to the new thread + override fun dispatch(context: CoroutineContext, block: Runnable) { + val existing = transaction.get() + + val newContext: CoroutineContext + if (existing != null) { + transaction.set(null) + newContext = context // + existing + } else { + newContext = context + } + + delegate.dispatch(newContext, Runnable { + if (existing != null) { + transaction.set(existing) + } + + block.run() + }) + } + + } + +} + +private class CoroutineTransaction(val manager: CoroutineTransactionManager, + itf: CoroutineTransactionInterface) : Transaction(itf), CoroutineContext.Element { + companion object Key : CoroutineContext.Key<CoroutineTransaction> + + override val key: CoroutineContext.Key<CoroutineTransaction> = Key +} + +private class CoroutineTransactionInterface(override val db: Database, isolation: Int, val threadLocal: ThreadLocal<CoroutineTransaction?>) : TransactionInterface { + private val connectionLazy = lazy(LazyThreadSafetyMode.NONE) { + db.connector().apply { + autoCommit = false + transactionIsolation = isolation + } + } + override val connection: Connection + get() = connectionLazy.value + + override val outerTransaction: CoroutineTransaction? = threadLocal.get() + + override fun commit() { + if (connectionLazy.isInitialized()) + connection.commit() + } + + override fun rollback() { + if (connectionLazy.isInitialized() && !connection.isClosed) { + connection.rollback() + } + } + + override fun close() { + try { + if (connectionLazy.isInitialized()) connection.close() + } finally { + threadLocal.set(outerTransaction) + } + } + +} + diff --git a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt index 5685346..01afd94 100644 --- a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt +++ b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt @@ -1,4 +1,4 @@ -@file:Suppress("NOTHING_TO_INLINE", "PARAMETER_NAME_CHANGED_ON_OVERRIDE", "LocalVariableName") +@file:Suppress("NOTHING_TO_INLINE", "PARAMETER_NAME_CHANGED_ON_OVERRIDE", "LocalVariableName", "UNUSED_EXPRESSION") package io.dico.parcels2.storage.exposed @@ -7,10 +7,10 @@ import io.dico.parcels2.* import io.dico.parcels2.storage.Backing import io.dico.parcels2.storage.DataPair import io.dico.parcels2.util.toUUID -import kotlinx.coroutines.experimental.CoroutineStart -import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.channels.LinkedListChannel +import kotlinx.coroutines.experimental.channels.ReceiveChannel import kotlinx.coroutines.experimental.channels.SendChannel -import kotlinx.coroutines.experimental.launch import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.SchemaUtils.create import org.jetbrains.exposed.sql.transactions.transaction @@ -21,14 +21,27 @@ import javax.sql.DataSource class ExposedDatabaseException(message: String? = null) : Exception(message) -class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : Backing { +class ExposedBacking(private val dataSourceFactory: () -> DataSource, + private val poolSize: Int) : Backing { override val name get() = "Exposed" + val dispatcher: CoroutineDispatcher = newFixedThreadPoolContext(4, "Parcels StorageThread") + private var dataSource: DataSource? = null private var database: Database? = null private var isShutdown: Boolean = false - override val isConnected get() = database != null + override fun launchJob(job: Backing.() -> Unit): Job = launch(dispatcher) { transaction { job() } } + override fun <T> launchFuture(future: Backing.() -> T): Deferred<T> = async(dispatcher) { transaction { future() } } + + override fun <T> openChannel(future: Backing.(SendChannel<T>) -> Unit): ReceiveChannel<T> { + val channel = LinkedListChannel<T>() + launchJob { future(channel) } + return channel + } + + private fun <T> transaction(statement: Transaction.() -> T) = transaction(database!!, statement) + companion object { init { Database.registerDialect("mariadb") { @@ -37,24 +50,17 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : } } - private fun <T> transaction(statement: Transaction.() -> T) = transaction(database!!, statement) - - private suspend fun transactionLaunch(statement: suspend Transaction.() -> Unit): Unit = transaction(database!!) { - launch(context = Unconfined, start = CoroutineStart.UNDISPATCHED) { - statement(this@transaction) - } - } + override fun init() { + if (isShutdown || isConnected) throw IllegalStateException() - override suspend fun init() { - if (isShutdown) throw IllegalStateException() dataSource = dataSourceFactory() database = Database.connect(dataSource!!) - transaction(database) { + transaction(database!!) { create(WorldsT, OwnersT, ParcelsT, ParcelOptionsT, AddedLocalT, AddedGlobalT) } } - override suspend fun shutdown() { + override fun shutdown() { if (isShutdown) throw IllegalStateException() dataSource?.let { (it as? HikariDataSource)?.close() @@ -63,15 +69,15 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : isShutdown = true } - override suspend fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) { + override fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) { for (parcel in parcels) { val data = readParcelData(parcel) - channel.send(parcel to data) + channel.offer(parcel to data) } channel.close() } - override suspend fun produceAllParcelData(channel: SendChannel<Pair<ParcelId, ParcelData?>>) = transactionLaunch { + override fun produceAllParcelData(channel: SendChannel<Pair<ParcelId, ParcelData?>>) = ctransaction<Unit> { ParcelsT.selectAll().forEach { row -> val parcel = ParcelsT.getId(row) ?: return@forEach val data = rowToParcelData(row) @@ -80,12 +86,12 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : channel.close() } - override suspend fun readParcelData(parcel: ParcelId): ParcelData? = transaction { + override fun readParcelData(parcel: ParcelId): ParcelData? = transaction { val row = ParcelsT.getRow(parcel) ?: return@transaction null rowToParcelData(row) } - override suspend fun getOwnedParcels(user: ParcelOwner): List<ParcelId> = transaction { + override fun getOwnedParcels(user: ParcelOwner): List<ParcelId> = transaction { val user_id = OwnersT.getId(user) ?: return@transaction emptyList() ParcelsT.select { ParcelsT.owner_id eq user_id } .orderBy(ParcelsT.claim_time, isAsc = true) @@ -93,7 +99,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : .toList() } - override suspend fun setParcelData(parcel: ParcelId, data: ParcelData?) { + override fun setParcelData(parcel: ParcelId, data: ParcelData?) { if (data == null) { transaction { ParcelsT.getId(parcel)?.let { id -> @@ -125,7 +131,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : setParcelAllowsInteractInventory(parcel, data.allowInteractInventory) } - override suspend fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = transaction { + override fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = transaction { val id = if (owner == null) ParcelsT.getId(parcel) ?: return@transaction else @@ -140,11 +146,11 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : } } - override suspend fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = transaction { + override fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = transaction { AddedLocalT.setPlayerStatus(parcel, player, status) } - override suspend fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean): Unit = transaction { + override fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean): Unit = transaction { val id = ParcelsT.getOrInitId(parcel) ParcelOptionsT.upsert(ParcelOptionsT.parcel_id) { it[ParcelOptionsT.parcel_id] = id @@ -152,7 +158,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : } } - override suspend fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean): Unit = transaction { + override fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean): Unit = transaction { val id = ParcelsT.getOrInitId(parcel) ParcelOptionsT.upsert(ParcelOptionsT.parcel_id) { it[ParcelOptionsT.parcel_id] = id @@ -160,16 +166,16 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : } } - override suspend fun produceAllGlobalAddedData(channel: SendChannel<Pair<ParcelOwner, MutableMap<UUID, AddedStatus>>>) = transactionLaunch { + override fun produceAllGlobalAddedData(channel: SendChannel<Pair<ParcelOwner, MutableMap<UUID, AddedStatus>>>) = ctransaction<Unit> { AddedGlobalT.sendAllAddedData(channel) channel.close() } - override suspend fun readGlobalAddedData(owner: ParcelOwner): MutableMap<UUID, AddedStatus> = transaction { + override fun readGlobalAddedData(owner: ParcelOwner): MutableMap<UUID, AddedStatus> = transaction { return@transaction AddedGlobalT.readAddedData(OwnersT.getId(owner) ?: return@transaction hashMapOf()) } - override suspend fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = transaction { + override fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = transaction { AddedGlobalT.setPlayerStatus(owner, player, status) } diff --git a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedExtensions.kt b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedExtensions.kt index 9f7f599..ce66644 100644 --- a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedExtensions.kt +++ b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedExtensions.kt @@ -6,6 +6,7 @@ import org.jetbrains.exposed.sql.Table import org.jetbrains.exposed.sql.Transaction import org.jetbrains.exposed.sql.statements.InsertStatement import org.jetbrains.exposed.sql.transactions.TransactionManager +import org.jetbrains.exposed.sql.transactions.transaction class UpsertStatement<Key : Any>(table: Table, conflictColumn: Column<*>? = null, conflictIndex: Index? = null) : InsertStatement<Key>(table, false) { @@ -61,3 +62,4 @@ fun Table.indexR(customIndexName: String? = null, isUnique: Boolean = false, var } fun Table.uniqueIndexR(customIndexName: String? = null, vararg columns: Column<*>): Index = indexR(customIndexName, true, *columns) + diff --git a/src/main/kotlin/io/dico/parcels2/storage/migration/Migration.kt b/src/main/kotlin/io/dico/parcels2/storage/migration/Migration.kt index c8bc93c..0db669a 100644 --- a/src/main/kotlin/io/dico/parcels2/storage/migration/Migration.kt +++ b/src/main/kotlin/io/dico/parcels2/storage/migration/Migration.kt @@ -1,8 +1,9 @@ package io.dico.parcels2.storage.migration import io.dico.parcels2.storage.Storage +import kotlinx.coroutines.experimental.Job interface Migration { - fun migrateTo(storage: Storage) + fun migrateTo(storage: Storage): Job } diff --git a/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeMigration.kt b/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeMigration.kt index e5b7d9d..1f6e49c 100644 --- a/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeMigration.kt +++ b/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeMigration.kt @@ -1,50 +1,50 @@ +@file:Suppress("RedundantSuspendModifier", "DEPRECATION") + package io.dico.parcels2.storage.migration.plotme import com.zaxxer.hikari.HikariDataSource import io.dico.parcels2.* +import io.dico.parcels2.options.PlotmeMigrationOptions import io.dico.parcels2.storage.Storage import io.dico.parcels2.storage.migration.Migration import io.dico.parcels2.util.Vec2i import io.dico.parcels2.util.isValid import io.dico.parcels2.util.toUUID import io.dico.parcels2.util.uuid -import kotlinx.coroutines.experimental.asCoroutineDispatcher -import kotlinx.coroutines.experimental.launch +import kotlinx.coroutines.experimental.* import org.bukkit.Bukkit import org.jetbrains.exposed.sql.* import org.slf4j.LoggerFactory import java.io.ByteArrayOutputStream import java.sql.Blob import java.util.UUID -import java.util.concurrent.Executors +import java.util.concurrent.ConcurrentHashMap import javax.sql.DataSource +import kotlin.coroutines.experimental.coroutineContext -class PlotmeMigration(val parcelProvider: ParcelProvider, - val worldMapper: Map<String, String>, - val dataSourceFactory: () -> DataSource) : Migration { +class PlotmeMigration(val options: PlotmeMigrationOptions) : Migration { private var dataSource: DataSource? = null private var database: Database? = null private var isShutdown: Boolean = false - private val dispatcher = Executors.newSingleThreadExecutor { Thread(it, "PlotMe Migration Thread") }.asCoroutineDispatcher() private val mlogger = LoggerFactory.getLogger("PlotMe Migrator") private fun <T> transaction(statement: Transaction.() -> T) = org.jetbrains.exposed.sql.transactions.transaction(database!!, statement) - override fun migrateTo(storage: Storage) { - launch(context = dispatcher) { + override fun migrateTo(storage: Storage): Job { + return launch(context = storage.asyncDispatcher) { init() - doWork(storage) + transaction { launch(context = Unconfined, start = CoroutineStart.UNDISPATCHED) { doWork(storage) } } shutdown() } } - fun init() { + suspend fun init() { if (isShutdown) throw IllegalStateException() - dataSource = dataSourceFactory() + dataSource = options.storage.getDataSourceFactory()!!() database = Database.connect(dataSource!!) } - fun shutdown() { + suspend fun shutdown() { if (isShutdown) throw IllegalStateException() dataSource?.let { (it as? HikariDataSource)?.close() @@ -53,22 +53,23 @@ class PlotmeMigration(val parcelProvider: ParcelProvider, isShutdown = true } - val parcelsCache = hashMapOf<String, MutableMap<Vec2i, ParcelData>>() + private val parcelsCache = hashMapOf<String, MutableMap<Vec2i, ParcelData>>() private fun getMap(worldName: String): MutableMap<Vec2i, ParcelData>? { - val mapped = worldMapper[worldName] ?: return null + val mapped = options.worldsFromTo[worldName] ?: return null return parcelsCache.computeIfAbsent(mapped) { mutableMapOf() } } private fun getData(worldName: String, position: Vec2i): ParcelData? { - return getMap(worldName)?.computeIfAbsent(position) { ParcelDataHolder() } + return getMap(worldName)?.computeIfAbsent(position) { ParcelDataHolder(addedMap = ConcurrentHashMap()) } } - fun doWork(target: Storage): Unit = transaction { + suspend fun doWork(target: Storage): Unit { if (!PlotmePlotsT.exists()) { mlogger.warn("Plotme tables don't appear to exist. Exiting.") - return@transaction + return } + parcelsCache.clear() iterPlotmeTable(PlotmePlotsT) { data, row -> @@ -76,22 +77,29 @@ class PlotmeMigration(val parcelProvider: ParcelProvider, data.owner = ParcelOwner(row[owner_uuid]?.toUUID(), row[owner_name]) } - iterPlotmeTable(PlotmeAllowedT) { data, row -> - val uuid = row[player_uuid]?.toUUID() - ?: Bukkit.getOfflinePlayer(row[player_name]).takeIf { it.isValid }?.uuid - ?: return@iterPlotmeTable + launch(context = target.asyncDispatcher) { + iterPlotmeTable(PlotmeAllowedT) { data, row -> + val uuid = row[player_uuid]?.toUUID() + ?: Bukkit.getOfflinePlayer(row[player_name]).takeIf { it.isValid }?.uuid + ?: return@iterPlotmeTable - data.setAddedStatus(uuid, AddedStatus.ALLOWED) + data.setAddedStatus(uuid, AddedStatus.ALLOWED) + } } - iterPlotmeTable(PlotmeDeniedT) { data, row -> - val uuid = row[PlotmeAllowedT.player_uuid]?.toUUID() - ?: Bukkit.getOfflinePlayer(row[PlotmeAllowedT.player_name]).takeIf { it.isValid }?.uuid - ?: return@iterPlotmeTable + launch(context = target.asyncDispatcher) { + iterPlotmeTable(PlotmeDeniedT) { data, row -> + val uuid = row[player_uuid]?.toUUID() + ?: Bukkit.getOfflinePlayer(row[player_name]).takeIf { it.isValid }?.uuid + ?: return@iterPlotmeTable - data.setAddedStatus(uuid, AddedStatus.BANNED) + data.setAddedStatus(uuid, AddedStatus.BANNED) + } } + println(coroutineContext[Job]!!.children) + coroutineContext[Job]!!.joinChildren() + for ((worldName, map) in parcelsCache) { val world = ParcelWorldId(worldName) for ((pos, data) in map) { diff --git a/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeTables.kt b/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeTables.kt index 3d07955..8564ad3 100644 --- a/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeTables.kt +++ b/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeTables.kt @@ -7,9 +7,9 @@ const val uppercase: Boolean = false fun String.toCorrectCase() = if (uppercase) this else toLowerCase() sealed class PlotmeTable(name: String) : Table(name) { - val px = PlotmePlotsT.integer("idX").primaryKey() - val pz = PlotmePlotsT.integer("idZ").primaryKey() - val world_name = PlotmePlotsT.varchar("world", 32).primaryKey() + val px = integer("idX").primaryKey() + val pz = integer("idZ").primaryKey() + val world_name = varchar("world", 32).primaryKey() } object PlotmePlotsT : PlotmeTable("plotmePlots".toCorrectCase()) { @@ -18,8 +18,8 @@ object PlotmePlotsT : PlotmeTable("plotmePlots".toCorrectCase()) { } sealed class PlotmePlotPlayerMap(name: String) : PlotmeTable(name) { - val player_name = PlotmePlotsT.varchar("player", 32) - val player_uuid = PlotmePlotsT.blob("playerid").nullable() + val player_name = varchar("player", 32) + val player_uuid = blob("playerid").nullable() } object PlotmeAllowedT : PlotmePlotPlayerMap("plotmeAllowed".toCorrectCase()) |