diff options
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt')
-rw-r--r-- | src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt | 66 |
1 files changed, 36 insertions, 30 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt index 5685346..01afd94 100644 --- a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt +++ b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt @@ -1,4 +1,4 @@ -@file:Suppress("NOTHING_TO_INLINE", "PARAMETER_NAME_CHANGED_ON_OVERRIDE", "LocalVariableName") +@file:Suppress("NOTHING_TO_INLINE", "PARAMETER_NAME_CHANGED_ON_OVERRIDE", "LocalVariableName", "UNUSED_EXPRESSION") package io.dico.parcels2.storage.exposed @@ -7,10 +7,10 @@ import io.dico.parcels2.* import io.dico.parcels2.storage.Backing import io.dico.parcels2.storage.DataPair import io.dico.parcels2.util.toUUID -import kotlinx.coroutines.experimental.CoroutineStart -import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.channels.LinkedListChannel +import kotlinx.coroutines.experimental.channels.ReceiveChannel import kotlinx.coroutines.experimental.channels.SendChannel -import kotlinx.coroutines.experimental.launch import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.SchemaUtils.create import org.jetbrains.exposed.sql.transactions.transaction @@ -21,14 +21,27 @@ import javax.sql.DataSource class ExposedDatabaseException(message: String? = null) : Exception(message) -class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : Backing { +class ExposedBacking(private val dataSourceFactory: () -> DataSource, + private val poolSize: Int) : Backing { override val name get() = "Exposed" + val dispatcher: CoroutineDispatcher = newFixedThreadPoolContext(4, "Parcels StorageThread") + private var dataSource: DataSource? = null private var database: Database? = null private var isShutdown: Boolean = false - override val isConnected get() = database != null + override fun launchJob(job: Backing.() -> Unit): Job = launch(dispatcher) { transaction { job() } } + override fun <T> launchFuture(future: Backing.() -> T): Deferred<T> = async(dispatcher) { transaction { future() } } + + override fun <T> openChannel(future: Backing.(SendChannel<T>) -> Unit): ReceiveChannel<T> { + val channel = LinkedListChannel<T>() + launchJob { future(channel) } + return channel + } + + private fun <T> transaction(statement: Transaction.() -> T) = transaction(database!!, statement) + companion object { init { Database.registerDialect("mariadb") { @@ -37,24 +50,17 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : } } - private fun <T> transaction(statement: Transaction.() -> T) = transaction(database!!, statement) - - private suspend fun transactionLaunch(statement: suspend Transaction.() -> Unit): Unit = transaction(database!!) { - launch(context = Unconfined, start = CoroutineStart.UNDISPATCHED) { - statement(this@transaction) - } - } + override fun init() { + if (isShutdown || isConnected) throw IllegalStateException() - override suspend fun init() { - if (isShutdown) throw IllegalStateException() dataSource = dataSourceFactory() database = Database.connect(dataSource!!) - transaction(database) { + transaction(database!!) { create(WorldsT, OwnersT, ParcelsT, ParcelOptionsT, AddedLocalT, AddedGlobalT) } } - override suspend fun shutdown() { + override fun shutdown() { if (isShutdown) throw IllegalStateException() dataSource?.let { (it as? HikariDataSource)?.close() @@ -63,15 +69,15 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : isShutdown = true } - override suspend fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) { + override fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) { for (parcel in parcels) { val data = readParcelData(parcel) - channel.send(parcel to data) + channel.offer(parcel to data) } channel.close() } - override suspend fun produceAllParcelData(channel: SendChannel<Pair<ParcelId, ParcelData?>>) = transactionLaunch { + override fun produceAllParcelData(channel: SendChannel<Pair<ParcelId, ParcelData?>>) = ctransaction<Unit> { ParcelsT.selectAll().forEach { row -> val parcel = ParcelsT.getId(row) ?: return@forEach val data = rowToParcelData(row) @@ -80,12 +86,12 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : channel.close() } - override suspend fun readParcelData(parcel: ParcelId): ParcelData? = transaction { + override fun readParcelData(parcel: ParcelId): ParcelData? = transaction { val row = ParcelsT.getRow(parcel) ?: return@transaction null rowToParcelData(row) } - override suspend fun getOwnedParcels(user: ParcelOwner): List<ParcelId> = transaction { + override fun getOwnedParcels(user: ParcelOwner): List<ParcelId> = transaction { val user_id = OwnersT.getId(user) ?: return@transaction emptyList() ParcelsT.select { ParcelsT.owner_id eq user_id } .orderBy(ParcelsT.claim_time, isAsc = true) @@ -93,7 +99,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : .toList() } - override suspend fun setParcelData(parcel: ParcelId, data: ParcelData?) { + override fun setParcelData(parcel: ParcelId, data: ParcelData?) { if (data == null) { transaction { ParcelsT.getId(parcel)?.let { id -> @@ -125,7 +131,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : setParcelAllowsInteractInventory(parcel, data.allowInteractInventory) } - override suspend fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = transaction { + override fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = transaction { val id = if (owner == null) ParcelsT.getId(parcel) ?: return@transaction else @@ -140,11 +146,11 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : } } - override suspend fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = transaction { + override fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = transaction { AddedLocalT.setPlayerStatus(parcel, player, status) } - override suspend fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean): Unit = transaction { + override fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean): Unit = transaction { val id = ParcelsT.getOrInitId(parcel) ParcelOptionsT.upsert(ParcelOptionsT.parcel_id) { it[ParcelOptionsT.parcel_id] = id @@ -152,7 +158,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : } } - override suspend fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean): Unit = transaction { + override fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean): Unit = transaction { val id = ParcelsT.getOrInitId(parcel) ParcelOptionsT.upsert(ParcelOptionsT.parcel_id) { it[ParcelOptionsT.parcel_id] = id @@ -160,16 +166,16 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : } } - override suspend fun produceAllGlobalAddedData(channel: SendChannel<Pair<ParcelOwner, MutableMap<UUID, AddedStatus>>>) = transactionLaunch { + override fun produceAllGlobalAddedData(channel: SendChannel<Pair<ParcelOwner, MutableMap<UUID, AddedStatus>>>) = ctransaction<Unit> { AddedGlobalT.sendAllAddedData(channel) channel.close() } - override suspend fun readGlobalAddedData(owner: ParcelOwner): MutableMap<UUID, AddedStatus> = transaction { + override fun readGlobalAddedData(owner: ParcelOwner): MutableMap<UUID, AddedStatus> = transaction { return@transaction AddedGlobalT.readAddedData(OwnersT.getId(owner) ?: return@transaction hashMapOf()) } - override suspend fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = transaction { + override fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = transaction { AddedGlobalT.setPlayerStatus(owner, player, status) } |