summaryrefslogtreecommitdiff
path: root/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt')
-rw-r--r--src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt66
1 files changed, 36 insertions, 30 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt
index 5685346..01afd94 100644
--- a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt
+++ b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt
@@ -1,4 +1,4 @@
-@file:Suppress("NOTHING_TO_INLINE", "PARAMETER_NAME_CHANGED_ON_OVERRIDE", "LocalVariableName")
+@file:Suppress("NOTHING_TO_INLINE", "PARAMETER_NAME_CHANGED_ON_OVERRIDE", "LocalVariableName", "UNUSED_EXPRESSION")
package io.dico.parcels2.storage.exposed
@@ -7,10 +7,10 @@ import io.dico.parcels2.*
import io.dico.parcels2.storage.Backing
import io.dico.parcels2.storage.DataPair
import io.dico.parcels2.util.toUUID
-import kotlinx.coroutines.experimental.CoroutineStart
-import kotlinx.coroutines.experimental.Unconfined
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.LinkedListChannel
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
-import kotlinx.coroutines.experimental.launch
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SchemaUtils.create
import org.jetbrains.exposed.sql.transactions.transaction
@@ -21,14 +21,27 @@ import javax.sql.DataSource
class ExposedDatabaseException(message: String? = null) : Exception(message)
-class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : Backing {
+class ExposedBacking(private val dataSourceFactory: () -> DataSource,
+ private val poolSize: Int) : Backing {
override val name get() = "Exposed"
+ val dispatcher: CoroutineDispatcher = newFixedThreadPoolContext(4, "Parcels StorageThread")
+
private var dataSource: DataSource? = null
private var database: Database? = null
private var isShutdown: Boolean = false
-
override val isConnected get() = database != null
+ override fun launchJob(job: Backing.() -> Unit): Job = launch(dispatcher) { transaction { job() } }
+ override fun <T> launchFuture(future: Backing.() -> T): Deferred<T> = async(dispatcher) { transaction { future() } }
+
+ override fun <T> openChannel(future: Backing.(SendChannel<T>) -> Unit): ReceiveChannel<T> {
+ val channel = LinkedListChannel<T>()
+ launchJob { future(channel) }
+ return channel
+ }
+
+ private fun <T> transaction(statement: Transaction.() -> T) = transaction(database!!, statement)
+
companion object {
init {
Database.registerDialect("mariadb") {
@@ -37,24 +50,17 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
}
}
- private fun <T> transaction(statement: Transaction.() -> T) = transaction(database!!, statement)
-
- private suspend fun transactionLaunch(statement: suspend Transaction.() -> Unit): Unit = transaction(database!!) {
- launch(context = Unconfined, start = CoroutineStart.UNDISPATCHED) {
- statement(this@transaction)
- }
- }
+ override fun init() {
+ if (isShutdown || isConnected) throw IllegalStateException()
- override suspend fun init() {
- if (isShutdown) throw IllegalStateException()
dataSource = dataSourceFactory()
database = Database.connect(dataSource!!)
- transaction(database) {
+ transaction(database!!) {
create(WorldsT, OwnersT, ParcelsT, ParcelOptionsT, AddedLocalT, AddedGlobalT)
}
}
- override suspend fun shutdown() {
+ override fun shutdown() {
if (isShutdown) throw IllegalStateException()
dataSource?.let {
(it as? HikariDataSource)?.close()
@@ -63,15 +69,15 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
isShutdown = true
}
- override suspend fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) {
+ override fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) {
for (parcel in parcels) {
val data = readParcelData(parcel)
- channel.send(parcel to data)
+ channel.offer(parcel to data)
}
channel.close()
}
- override suspend fun produceAllParcelData(channel: SendChannel<Pair<ParcelId, ParcelData?>>) = transactionLaunch {
+ override fun produceAllParcelData(channel: SendChannel<Pair<ParcelId, ParcelData?>>) = ctransaction<Unit> {
ParcelsT.selectAll().forEach { row ->
val parcel = ParcelsT.getId(row) ?: return@forEach
val data = rowToParcelData(row)
@@ -80,12 +86,12 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
channel.close()
}
- override suspend fun readParcelData(parcel: ParcelId): ParcelData? = transaction {
+ override fun readParcelData(parcel: ParcelId): ParcelData? = transaction {
val row = ParcelsT.getRow(parcel) ?: return@transaction null
rowToParcelData(row)
}
- override suspend fun getOwnedParcels(user: ParcelOwner): List<ParcelId> = transaction {
+ override fun getOwnedParcels(user: ParcelOwner): List<ParcelId> = transaction {
val user_id = OwnersT.getId(user) ?: return@transaction emptyList()
ParcelsT.select { ParcelsT.owner_id eq user_id }
.orderBy(ParcelsT.claim_time, isAsc = true)
@@ -93,7 +99,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
.toList()
}
- override suspend fun setParcelData(parcel: ParcelId, data: ParcelData?) {
+ override fun setParcelData(parcel: ParcelId, data: ParcelData?) {
if (data == null) {
transaction {
ParcelsT.getId(parcel)?.let { id ->
@@ -125,7 +131,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
setParcelAllowsInteractInventory(parcel, data.allowInteractInventory)
}
- override suspend fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = transaction {
+ override fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = transaction {
val id = if (owner == null)
ParcelsT.getId(parcel) ?: return@transaction
else
@@ -140,11 +146,11 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
}
}
- override suspend fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = transaction {
+ override fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = transaction {
AddedLocalT.setPlayerStatus(parcel, player, status)
}
- override suspend fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean): Unit = transaction {
+ override fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean): Unit = transaction {
val id = ParcelsT.getOrInitId(parcel)
ParcelOptionsT.upsert(ParcelOptionsT.parcel_id) {
it[ParcelOptionsT.parcel_id] = id
@@ -152,7 +158,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
}
}
- override suspend fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean): Unit = transaction {
+ override fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean): Unit = transaction {
val id = ParcelsT.getOrInitId(parcel)
ParcelOptionsT.upsert(ParcelOptionsT.parcel_id) {
it[ParcelOptionsT.parcel_id] = id
@@ -160,16 +166,16 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
}
}
- override suspend fun produceAllGlobalAddedData(channel: SendChannel<Pair<ParcelOwner, MutableMap<UUID, AddedStatus>>>) = transactionLaunch {
+ override fun produceAllGlobalAddedData(channel: SendChannel<Pair<ParcelOwner, MutableMap<UUID, AddedStatus>>>) = ctransaction<Unit> {
AddedGlobalT.sendAllAddedData(channel)
channel.close()
}
- override suspend fun readGlobalAddedData(owner: ParcelOwner): MutableMap<UUID, AddedStatus> = transaction {
+ override fun readGlobalAddedData(owner: ParcelOwner): MutableMap<UUID, AddedStatus> = transaction {
return@transaction AddedGlobalT.readAddedData(OwnersT.getId(owner) ?: return@transaction hashMapOf())
}
- override suspend fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = transaction {
+ override fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = transaction {
AddedGlobalT.setPlayerStatus(owner, player, status)
}