summaryrefslogtreecommitdiff
path: root/src/main/kotlin/io/dico/parcels2/storage
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/storage')
-rw-r--r--src/main/kotlin/io/dico/parcels2/storage/Backing.kt55
-rw-r--r--src/main/kotlin/io/dico/parcels2/storage/Storage.kt61
-rw-r--r--src/main/kotlin/io/dico/parcels2/storage/exposed/CoroutineTransactionManager.kt118
-rw-r--r--src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt66
-rw-r--r--src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedExtensions.kt2
-rw-r--r--src/main/kotlin/io/dico/parcels2/storage/migration/Migration.kt3
-rw-r--r--src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeMigration.kt64
-rw-r--r--src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeTables.kt10
8 files changed, 258 insertions, 121 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/storage/Backing.kt b/src/main/kotlin/io/dico/parcels2/storage/Backing.kt
index 88ee5fd..bb4cf33 100644
--- a/src/main/kotlin/io/dico/parcels2/storage/Backing.kt
+++ b/src/main/kotlin/io/dico/parcels2/storage/Backing.kt
@@ -1,6 +1,12 @@
package io.dico.parcels2.storage
import io.dico.parcels2.*
+import kotlinx.coroutines.experimental.CoroutineDispatcher
+import kotlinx.coroutines.experimental.CoroutineScope
+import kotlinx.coroutines.experimental.Deferred
+import kotlinx.coroutines.experimental.Job
+import kotlinx.coroutines.experimental.channels.ProducerScope
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
import java.util.UUID
@@ -10,41 +16,58 @@ interface Backing {
val isConnected: Boolean
- suspend fun init()
+ fun launchJob(job: Backing.() -> Unit): Job
- suspend fun shutdown()
+ fun <T> launchFuture(future: Backing.() -> T): Deferred<T>
+
+ fun <T> openChannel(future: Backing.(SendChannel<T>) -> Unit): ReceiveChannel<T>
+
+
+ fun init()
+
+ fun shutdown()
/**
* This producer function is capable of constantly reading parcels from a potentially infinite sequence,
* and provide parcel data for it as read from the database.
*/
- suspend fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>)
+ fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>)
+
+ fun produceAllParcelData(channel: SendChannel<DataPair>)
+
+ fun readParcelData(parcel: ParcelId): ParcelData?
+
+ fun getOwnedParcels(user: ParcelOwner): List<ParcelId>
+
+ fun getNumParcels(user: ParcelOwner): Int = getOwnedParcels(user).size
+
- suspend fun produceAllParcelData(channel: SendChannel<DataPair>)
+ fun setParcelData(parcel: ParcelId, data: ParcelData?)
- suspend fun readParcelData(parcel: ParcelId): ParcelData?
+ fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?)
- suspend fun getOwnedParcels(user: ParcelOwner): List<ParcelId>
+ fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus)
- suspend fun getNumParcels(user: ParcelOwner): Int = getOwnedParcels(user).size
+ fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean)
+ fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean)
- suspend fun setParcelData(parcel: ParcelId, data: ParcelData?)
- suspend fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?)
+ fun produceAllGlobalAddedData(channel: SendChannel<AddedDataPair<ParcelOwner>>)
- suspend fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus)
+ fun readGlobalAddedData(owner: ParcelOwner): MutableAddedDataMap
- suspend fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean)
+ fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus)
- suspend fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean)
+}
+abstract class AbstractBacking(val dispatcher: CoroutineDispatcher) {
- suspend fun produceAllGlobalAddedData(channel: SendChannel<AddedDataPair<ParcelOwner>>)
+ fun launchJob(job: Backing.() -> Unit): Job
- suspend fun readGlobalAddedData(owner: ParcelOwner): MutableAddedDataMap
+ fun <T> launchFuture(future: Backing.() -> T): Deferred<T>
- suspend fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus)
+ fun <T> openChannel(future: Backing.(SendChannel<T>) -> Unit): ReceiveChannel<T>
-} \ No newline at end of file
+}
diff --git a/src/main/kotlin/io/dico/parcels2/storage/Storage.kt b/src/main/kotlin/io/dico/parcels2/storage/Storage.kt
index 6c3d68f..6770d99 100644
--- a/src/main/kotlin/io/dico/parcels2/storage/Storage.kt
+++ b/src/main/kotlin/io/dico/parcels2/storage/Storage.kt
@@ -3,21 +3,16 @@
package io.dico.parcels2.storage
import io.dico.parcels2.*
-import kotlinx.coroutines.experimental.*
-import kotlinx.coroutines.experimental.channels.ProducerScope
+import kotlinx.coroutines.experimental.Deferred
+import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.channels.ReceiveChannel
-import kotlinx.coroutines.experimental.channels.produce
import java.util.UUID
-import java.util.concurrent.Executor
-import java.util.concurrent.Executors
typealias DataPair = Pair<ParcelId, ParcelData?>
typealias AddedDataPair<TAttach> = Pair<TAttach, MutableAddedDataMap>
interface Storage {
val name: String
- val syncDispatcher: CoroutineDispatcher
- val asyncDispatcher: CoroutineDispatcher
val isConnected: Boolean
fun init(): Job
@@ -54,55 +49,39 @@ interface Storage {
fun setGlobalAddedStatus(owner: ParcelOwner, player: UUID, status: AddedStatus): Job
}
-class StorageWithCoroutineBacking internal constructor(val backing: Backing) : Storage {
- override val name get() = backing.name
- override val syncDispatcher = Executor { it.run() }.asCoroutineDispatcher()
- val poolSize: Int get() = 4
- override val asyncDispatcher = Executors.newFixedThreadPool(poolSize) { Thread(it, "Parcels2_StorageThread") }.asCoroutineDispatcher()
- override val isConnected get() = backing.isConnected
- val channelCapacity = 16
+class BackedStorage internal constructor(val b: Backing) : Storage {
+ override val name get() = b.name
+ override val isConnected get() = b.isConnected
- private inline fun <T> defer(noinline block: suspend CoroutineScope.() -> T): Deferred<T> {
- return async(context = asyncDispatcher, start = CoroutineStart.ATOMIC, block = block)
- }
+ override fun init() = b.launchJob { init() }
- private inline fun job(noinline block: suspend CoroutineScope.() -> Unit): Job {
- return launch(context = asyncDispatcher, start = CoroutineStart.ATOMIC, block = block)
- }
+ override fun shutdown() = b.launchJob { shutdown() }
- private inline fun <T> openChannel(noinline block: suspend ProducerScope<T>.() -> Unit): ReceiveChannel<T> {
- return produce(asyncDispatcher, capacity = channelCapacity, block = block)
- }
- override fun init() = job { backing.init() }
+ override fun readParcelData(parcel: ParcelId) = b.launchFuture { readParcelData(parcel) }
- override fun shutdown() = job { backing.shutdown() }
+ override fun readParcelData(parcels: Sequence<ParcelId>) = b.openChannel<DataPair> { produceParcelData(it, parcels) }
+ override fun readAllParcelData() = b.openChannel<DataPair> { produceAllParcelData(it) }
- override fun readParcelData(parcel: ParcelId) = defer { backing.readParcelData(parcel) }
+ override fun getOwnedParcels(user: ParcelOwner) = b.launchFuture { getOwnedParcels(user) }
- override fun readParcelData(parcels: Sequence<ParcelId>) = openChannel<DataPair> { backing.produceParcelData(channel, parcels) }
+ override fun getNumParcels(user: ParcelOwner) = b.launchFuture { getNumParcels(user) }
- override fun readAllParcelData() = openChannel<DataPair> { backing.produceAllParcelData(channel) }
+ override fun setParcelData(parcel: ParcelId, data: ParcelData?) = b.launchJob { setParcelData(parcel, data) }
- override fun getOwnedParcels(user: ParcelOwner) = defer { backing.getOwnedParcels(user) }
+ override fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = b.launchJob { setParcelOwner(parcel, owner) }
- override fun getNumParcels(user: ParcelOwner) = defer { backing.getNumParcels(user) }
+ override fun setParcelPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = b.launchJob { setLocalPlayerStatus(parcel, player, status) }
- override fun setParcelData(parcel: ParcelId, data: ParcelData?) = job { backing.setParcelData(parcel, data) }
+ override fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean) = b.launchJob { setParcelAllowsInteractInventory(parcel, value) }
- override fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = job { backing.setParcelOwner(parcel, owner) }
+ override fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean) = b.launchJob { setParcelAllowsInteractInputs(parcel, value) }
- override fun setParcelPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = job { backing.setLocalPlayerStatus(parcel, player, status) }
- override fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean) = job { backing.setParcelAllowsInteractInventory(parcel, value) }
+ override fun readAllGlobalAddedData(): ReceiveChannel<AddedDataPair<ParcelOwner>> = b.openChannel { produceAllGlobalAddedData(it) }
- override fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean) = job { backing.setParcelAllowsInteractInputs(parcel, value) }
+ override fun readGlobalAddedData(owner: ParcelOwner): Deferred<MutableAddedDataMap?> = b.launchFuture { readGlobalAddedData(owner) }
-
- override fun readAllGlobalAddedData(): ReceiveChannel<AddedDataPair<ParcelOwner>> = openChannel { backing.produceAllGlobalAddedData(channel) }
-
- override fun readGlobalAddedData(owner: ParcelOwner): Deferred<MutableAddedDataMap?> = defer { backing.readGlobalAddedData(owner) }
-
- override fun setGlobalAddedStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = job { backing.setGlobalPlayerStatus(owner, player, status) }
+ override fun setGlobalAddedStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = b.launchJob { setGlobalPlayerStatus(owner, player, status) }
}
diff --git a/src/main/kotlin/io/dico/parcels2/storage/exposed/CoroutineTransactionManager.kt b/src/main/kotlin/io/dico/parcels2/storage/exposed/CoroutineTransactionManager.kt
new file mode 100644
index 0000000..ab707af
--- /dev/null
+++ b/src/main/kotlin/io/dico/parcels2/storage/exposed/CoroutineTransactionManager.kt
@@ -0,0 +1,118 @@
+package io.dico.parcels2.storage.exposed
+
+import kotlinx.coroutines.experimental.*
+import org.jetbrains.exposed.sql.*
+import org.jetbrains.exposed.sql.statements.StatementContext
+import org.jetbrains.exposed.sql.statements.StatementInterceptor
+import org.jetbrains.exposed.sql.statements.expandArgs
+import org.jetbrains.exposed.sql.transactions.*
+import org.slf4j.LoggerFactory
+import java.sql.Connection
+import kotlin.coroutines.experimental.CoroutineContext
+
+fun <T> ctransaction(db: Database? = null, statement: suspend Transaction.() -> T): T {
+ return ctransaction(TransactionManager.manager.defaultIsolationLevel, 3, db, statement)
+}
+
+fun <T> ctransaction(transactionIsolation: Int, repetitionAttempts: Int, db: Database? = null, statement: suspend Transaction.() -> T): T {
+ return transaction(transactionIsolation, repetitionAttempts, db) {
+ if (this !is CoroutineTransaction) throw IllegalStateException("ctransaction requires CoroutineTransactionManager.")
+
+ val job = async(context = manager.context, start = CoroutineStart.UNDISPATCHED) {
+ this@transaction.statement()
+ }
+
+ if (job.isActive) {
+ runBlocking(context = Unconfined) {
+ job.join()
+ }
+ }
+
+ job.getCompleted()
+ }
+}
+
+class CoroutineTransactionManager(private val db: Database,
+ dispatcher: CoroutineDispatcher,
+ override var defaultIsolationLevel: Int = DEFAULT_ISOLATION_LEVEL) : TransactionManager {
+ val context: CoroutineDispatcher = TransactionCoroutineDispatcher(dispatcher)
+ private val transaction = ThreadLocal<CoroutineTransaction?>()
+
+ override fun currentOrNull(): Transaction? {
+
+
+ return transaction.get()
+ ?: null
+ }
+
+ override fun newTransaction(isolation: Int): Transaction {
+ return CoroutineTransaction(this, CoroutineTransactionInterface(db, isolation, transaction)).also { transaction.set(it) }
+ }
+
+ private inner class TransactionCoroutineDispatcher(val delegate: CoroutineDispatcher) : CoroutineDispatcher() {
+
+ // When the thread changes, move the transaction to the new thread
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ val existing = transaction.get()
+
+ val newContext: CoroutineContext
+ if (existing != null) {
+ transaction.set(null)
+ newContext = context // + existing
+ } else {
+ newContext = context
+ }
+
+ delegate.dispatch(newContext, Runnable {
+ if (existing != null) {
+ transaction.set(existing)
+ }
+
+ block.run()
+ })
+ }
+
+ }
+
+}
+
+private class CoroutineTransaction(val manager: CoroutineTransactionManager,
+ itf: CoroutineTransactionInterface) : Transaction(itf), CoroutineContext.Element {
+ companion object Key : CoroutineContext.Key<CoroutineTransaction>
+
+ override val key: CoroutineContext.Key<CoroutineTransaction> = Key
+}
+
+private class CoroutineTransactionInterface(override val db: Database, isolation: Int, val threadLocal: ThreadLocal<CoroutineTransaction?>) : TransactionInterface {
+ private val connectionLazy = lazy(LazyThreadSafetyMode.NONE) {
+ db.connector().apply {
+ autoCommit = false
+ transactionIsolation = isolation
+ }
+ }
+ override val connection: Connection
+ get() = connectionLazy.value
+
+ override val outerTransaction: CoroutineTransaction? = threadLocal.get()
+
+ override fun commit() {
+ if (connectionLazy.isInitialized())
+ connection.commit()
+ }
+
+ override fun rollback() {
+ if (connectionLazy.isInitialized() && !connection.isClosed) {
+ connection.rollback()
+ }
+ }
+
+ override fun close() {
+ try {
+ if (connectionLazy.isInitialized()) connection.close()
+ } finally {
+ threadLocal.set(outerTransaction)
+ }
+ }
+
+}
+
diff --git a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt
index 5685346..01afd94 100644
--- a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt
+++ b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt
@@ -1,4 +1,4 @@
-@file:Suppress("NOTHING_TO_INLINE", "PARAMETER_NAME_CHANGED_ON_OVERRIDE", "LocalVariableName")
+@file:Suppress("NOTHING_TO_INLINE", "PARAMETER_NAME_CHANGED_ON_OVERRIDE", "LocalVariableName", "UNUSED_EXPRESSION")
package io.dico.parcels2.storage.exposed
@@ -7,10 +7,10 @@ import io.dico.parcels2.*
import io.dico.parcels2.storage.Backing
import io.dico.parcels2.storage.DataPair
import io.dico.parcels2.util.toUUID
-import kotlinx.coroutines.experimental.CoroutineStart
-import kotlinx.coroutines.experimental.Unconfined
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.LinkedListChannel
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
-import kotlinx.coroutines.experimental.launch
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SchemaUtils.create
import org.jetbrains.exposed.sql.transactions.transaction
@@ -21,14 +21,27 @@ import javax.sql.DataSource
class ExposedDatabaseException(message: String? = null) : Exception(message)
-class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : Backing {
+class ExposedBacking(private val dataSourceFactory: () -> DataSource,
+ private val poolSize: Int) : Backing {
override val name get() = "Exposed"
+ val dispatcher: CoroutineDispatcher = newFixedThreadPoolContext(4, "Parcels StorageThread")
+
private var dataSource: DataSource? = null
private var database: Database? = null
private var isShutdown: Boolean = false
-
override val isConnected get() = database != null
+ override fun launchJob(job: Backing.() -> Unit): Job = launch(dispatcher) { transaction { job() } }
+ override fun <T> launchFuture(future: Backing.() -> T): Deferred<T> = async(dispatcher) { transaction { future() } }
+
+ override fun <T> openChannel(future: Backing.(SendChannel<T>) -> Unit): ReceiveChannel<T> {
+ val channel = LinkedListChannel<T>()
+ launchJob { future(channel) }
+ return channel
+ }
+
+ private fun <T> transaction(statement: Transaction.() -> T) = transaction(database!!, statement)
+
companion object {
init {
Database.registerDialect("mariadb") {
@@ -37,24 +50,17 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
}
}
- private fun <T> transaction(statement: Transaction.() -> T) = transaction(database!!, statement)
-
- private suspend fun transactionLaunch(statement: suspend Transaction.() -> Unit): Unit = transaction(database!!) {
- launch(context = Unconfined, start = CoroutineStart.UNDISPATCHED) {
- statement(this@transaction)
- }
- }
+ override fun init() {
+ if (isShutdown || isConnected) throw IllegalStateException()
- override suspend fun init() {
- if (isShutdown) throw IllegalStateException()
dataSource = dataSourceFactory()
database = Database.connect(dataSource!!)
- transaction(database) {
+ transaction(database!!) {
create(WorldsT, OwnersT, ParcelsT, ParcelOptionsT, AddedLocalT, AddedGlobalT)
}
}
- override suspend fun shutdown() {
+ override fun shutdown() {
if (isShutdown) throw IllegalStateException()
dataSource?.let {
(it as? HikariDataSource)?.close()
@@ -63,15 +69,15 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
isShutdown = true
}
- override suspend fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) {
+ override fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) {
for (parcel in parcels) {
val data = readParcelData(parcel)
- channel.send(parcel to data)
+ channel.offer(parcel to data)
}
channel.close()
}
- override suspend fun produceAllParcelData(channel: SendChannel<Pair<ParcelId, ParcelData?>>) = transactionLaunch {
+ override fun produceAllParcelData(channel: SendChannel<Pair<ParcelId, ParcelData?>>) = ctransaction<Unit> {
ParcelsT.selectAll().forEach { row ->
val parcel = ParcelsT.getId(row) ?: return@forEach
val data = rowToParcelData(row)
@@ -80,12 +86,12 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
channel.close()
}
- override suspend fun readParcelData(parcel: ParcelId): ParcelData? = transaction {
+ override fun readParcelData(parcel: ParcelId): ParcelData? = transaction {
val row = ParcelsT.getRow(parcel) ?: return@transaction null
rowToParcelData(row)
}
- override suspend fun getOwnedParcels(user: ParcelOwner): List<ParcelId> = transaction {
+ override fun getOwnedParcels(user: ParcelOwner): List<ParcelId> = transaction {
val user_id = OwnersT.getId(user) ?: return@transaction emptyList()
ParcelsT.select { ParcelsT.owner_id eq user_id }
.orderBy(ParcelsT.claim_time, isAsc = true)
@@ -93,7 +99,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
.toList()
}
- override suspend fun setParcelData(parcel: ParcelId, data: ParcelData?) {
+ override fun setParcelData(parcel: ParcelId, data: ParcelData?) {
if (data == null) {
transaction {
ParcelsT.getId(parcel)?.let { id ->
@@ -125,7 +131,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
setParcelAllowsInteractInventory(parcel, data.allowInteractInventory)
}
- override suspend fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = transaction {
+ override fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = transaction {
val id = if (owner == null)
ParcelsT.getId(parcel) ?: return@transaction
else
@@ -140,11 +146,11 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
}
}
- override suspend fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = transaction {
+ override fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = transaction {
AddedLocalT.setPlayerStatus(parcel, player, status)
}
- override suspend fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean): Unit = transaction {
+ override fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean): Unit = transaction {
val id = ParcelsT.getOrInitId(parcel)
ParcelOptionsT.upsert(ParcelOptionsT.parcel_id) {
it[ParcelOptionsT.parcel_id] = id
@@ -152,7 +158,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
}
}
- override suspend fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean): Unit = transaction {
+ override fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean): Unit = transaction {
val id = ParcelsT.getOrInitId(parcel)
ParcelOptionsT.upsert(ParcelOptionsT.parcel_id) {
it[ParcelOptionsT.parcel_id] = id
@@ -160,16 +166,16 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
}
}
- override suspend fun produceAllGlobalAddedData(channel: SendChannel<Pair<ParcelOwner, MutableMap<UUID, AddedStatus>>>) = transactionLaunch {
+ override fun produceAllGlobalAddedData(channel: SendChannel<Pair<ParcelOwner, MutableMap<UUID, AddedStatus>>>) = ctransaction<Unit> {
AddedGlobalT.sendAllAddedData(channel)
channel.close()
}
- override suspend fun readGlobalAddedData(owner: ParcelOwner): MutableMap<UUID, AddedStatus> = transaction {
+ override fun readGlobalAddedData(owner: ParcelOwner): MutableMap<UUID, AddedStatus> = transaction {
return@transaction AddedGlobalT.readAddedData(OwnersT.getId(owner) ?: return@transaction hashMapOf())
}
- override suspend fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = transaction {
+ override fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = transaction {
AddedGlobalT.setPlayerStatus(owner, player, status)
}
diff --git a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedExtensions.kt b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedExtensions.kt
index 9f7f599..ce66644 100644
--- a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedExtensions.kt
+++ b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedExtensions.kt
@@ -6,6 +6,7 @@ import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.statements.InsertStatement
import org.jetbrains.exposed.sql.transactions.TransactionManager
+import org.jetbrains.exposed.sql.transactions.transaction
class UpsertStatement<Key : Any>(table: Table, conflictColumn: Column<*>? = null, conflictIndex: Index? = null)
: InsertStatement<Key>(table, false) {
@@ -61,3 +62,4 @@ fun Table.indexR(customIndexName: String? = null, isUnique: Boolean = false, var
}
fun Table.uniqueIndexR(customIndexName: String? = null, vararg columns: Column<*>): Index = indexR(customIndexName, true, *columns)
+
diff --git a/src/main/kotlin/io/dico/parcels2/storage/migration/Migration.kt b/src/main/kotlin/io/dico/parcels2/storage/migration/Migration.kt
index c8bc93c..0db669a 100644
--- a/src/main/kotlin/io/dico/parcels2/storage/migration/Migration.kt
+++ b/src/main/kotlin/io/dico/parcels2/storage/migration/Migration.kt
@@ -1,8 +1,9 @@
package io.dico.parcels2.storage.migration
import io.dico.parcels2.storage.Storage
+import kotlinx.coroutines.experimental.Job
interface Migration {
- fun migrateTo(storage: Storage)
+ fun migrateTo(storage: Storage): Job
}
diff --git a/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeMigration.kt b/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeMigration.kt
index e5b7d9d..1f6e49c 100644
--- a/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeMigration.kt
+++ b/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeMigration.kt
@@ -1,50 +1,50 @@
+@file:Suppress("RedundantSuspendModifier", "DEPRECATION")
+
package io.dico.parcels2.storage.migration.plotme
import com.zaxxer.hikari.HikariDataSource
import io.dico.parcels2.*
+import io.dico.parcels2.options.PlotmeMigrationOptions
import io.dico.parcels2.storage.Storage
import io.dico.parcels2.storage.migration.Migration
import io.dico.parcels2.util.Vec2i
import io.dico.parcels2.util.isValid
import io.dico.parcels2.util.toUUID
import io.dico.parcels2.util.uuid
-import kotlinx.coroutines.experimental.asCoroutineDispatcher
-import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.*
import org.bukkit.Bukkit
import org.jetbrains.exposed.sql.*
import org.slf4j.LoggerFactory
import java.io.ByteArrayOutputStream
import java.sql.Blob
import java.util.UUID
-import java.util.concurrent.Executors
+import java.util.concurrent.ConcurrentHashMap
import javax.sql.DataSource
+import kotlin.coroutines.experimental.coroutineContext
-class PlotmeMigration(val parcelProvider: ParcelProvider,
- val worldMapper: Map<String, String>,
- val dataSourceFactory: () -> DataSource) : Migration {
+class PlotmeMigration(val options: PlotmeMigrationOptions) : Migration {
private var dataSource: DataSource? = null
private var database: Database? = null
private var isShutdown: Boolean = false
- private val dispatcher = Executors.newSingleThreadExecutor { Thread(it, "PlotMe Migration Thread") }.asCoroutineDispatcher()
private val mlogger = LoggerFactory.getLogger("PlotMe Migrator")
private fun <T> transaction(statement: Transaction.() -> T) = org.jetbrains.exposed.sql.transactions.transaction(database!!, statement)
- override fun migrateTo(storage: Storage) {
- launch(context = dispatcher) {
+ override fun migrateTo(storage: Storage): Job {
+ return launch(context = storage.asyncDispatcher) {
init()
- doWork(storage)
+ transaction { launch(context = Unconfined, start = CoroutineStart.UNDISPATCHED) { doWork(storage) } }
shutdown()
}
}
- fun init() {
+ suspend fun init() {
if (isShutdown) throw IllegalStateException()
- dataSource = dataSourceFactory()
+ dataSource = options.storage.getDataSourceFactory()!!()
database = Database.connect(dataSource!!)
}
- fun shutdown() {
+ suspend fun shutdown() {
if (isShutdown) throw IllegalStateException()
dataSource?.let {
(it as? HikariDataSource)?.close()
@@ -53,22 +53,23 @@ class PlotmeMigration(val parcelProvider: ParcelProvider,
isShutdown = true
}
- val parcelsCache = hashMapOf<String, MutableMap<Vec2i, ParcelData>>()
+ private val parcelsCache = hashMapOf<String, MutableMap<Vec2i, ParcelData>>()
private fun getMap(worldName: String): MutableMap<Vec2i, ParcelData>? {
- val mapped = worldMapper[worldName] ?: return null
+ val mapped = options.worldsFromTo[worldName] ?: return null
return parcelsCache.computeIfAbsent(mapped) { mutableMapOf() }
}
private fun getData(worldName: String, position: Vec2i): ParcelData? {
- return getMap(worldName)?.computeIfAbsent(position) { ParcelDataHolder() }
+ return getMap(worldName)?.computeIfAbsent(position) { ParcelDataHolder(addedMap = ConcurrentHashMap()) }
}
- fun doWork(target: Storage): Unit = transaction {
+ suspend fun doWork(target: Storage): Unit {
if (!PlotmePlotsT.exists()) {
mlogger.warn("Plotme tables don't appear to exist. Exiting.")
- return@transaction
+ return
}
+
parcelsCache.clear()
iterPlotmeTable(PlotmePlotsT) { data, row ->
@@ -76,22 +77,29 @@ class PlotmeMigration(val parcelProvider: ParcelProvider,
data.owner = ParcelOwner(row[owner_uuid]?.toUUID(), row[owner_name])
}
- iterPlotmeTable(PlotmeAllowedT) { data, row ->
- val uuid = row[player_uuid]?.toUUID()
- ?: Bukkit.getOfflinePlayer(row[player_name]).takeIf { it.isValid }?.uuid
- ?: return@iterPlotmeTable
+ launch(context = target.asyncDispatcher) {
+ iterPlotmeTable(PlotmeAllowedT) { data, row ->
+ val uuid = row[player_uuid]?.toUUID()
+ ?: Bukkit.getOfflinePlayer(row[player_name]).takeIf { it.isValid }?.uuid
+ ?: return@iterPlotmeTable
- data.setAddedStatus(uuid, AddedStatus.ALLOWED)
+ data.setAddedStatus(uuid, AddedStatus.ALLOWED)
+ }
}
- iterPlotmeTable(PlotmeDeniedT) { data, row ->
- val uuid = row[PlotmeAllowedT.player_uuid]?.toUUID()
- ?: Bukkit.getOfflinePlayer(row[PlotmeAllowedT.player_name]).takeIf { it.isValid }?.uuid
- ?: return@iterPlotmeTable
+ launch(context = target.asyncDispatcher) {
+ iterPlotmeTable(PlotmeDeniedT) { data, row ->
+ val uuid = row[player_uuid]?.toUUID()
+ ?: Bukkit.getOfflinePlayer(row[player_name]).takeIf { it.isValid }?.uuid
+ ?: return@iterPlotmeTable
- data.setAddedStatus(uuid, AddedStatus.BANNED)
+ data.setAddedStatus(uuid, AddedStatus.BANNED)
+ }
}
+ println(coroutineContext[Job]!!.children)
+ coroutineContext[Job]!!.joinChildren()
+
for ((worldName, map) in parcelsCache) {
val world = ParcelWorldId(worldName)
for ((pos, data) in map) {
diff --git a/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeTables.kt b/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeTables.kt
index 3d07955..8564ad3 100644
--- a/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeTables.kt
+++ b/src/main/kotlin/io/dico/parcels2/storage/migration/plotme/PlotmeTables.kt
@@ -7,9 +7,9 @@ const val uppercase: Boolean = false
fun String.toCorrectCase() = if (uppercase) this else toLowerCase()
sealed class PlotmeTable(name: String) : Table(name) {
- val px = PlotmePlotsT.integer("idX").primaryKey()
- val pz = PlotmePlotsT.integer("idZ").primaryKey()
- val world_name = PlotmePlotsT.varchar("world", 32).primaryKey()
+ val px = integer("idX").primaryKey()
+ val pz = integer("idZ").primaryKey()
+ val world_name = varchar("world", 32).primaryKey()
}
object PlotmePlotsT : PlotmeTable("plotmePlots".toCorrectCase()) {
@@ -18,8 +18,8 @@ object PlotmePlotsT : PlotmeTable("plotmePlots".toCorrectCase()) {
}
sealed class PlotmePlotPlayerMap(name: String) : PlotmeTable(name) {
- val player_name = PlotmePlotsT.varchar("player", 32)
- val player_uuid = PlotmePlotsT.blob("playerid").nullable()
+ val player_name = varchar("player", 32)
+ val player_uuid = blob("playerid").nullable()
}
object PlotmeAllowedT : PlotmePlotPlayerMap("plotmeAllowed".toCorrectCase())