summaryrefslogtreecommitdiff
path: root/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt')
-rw-r--r--src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt175
1 files changed, 112 insertions, 63 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt
index 5685346..8ea6653 100644
--- a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt
+++ b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt
@@ -1,16 +1,19 @@
-@file:Suppress("NOTHING_TO_INLINE", "PARAMETER_NAME_CHANGED_ON_OVERRIDE", "LocalVariableName")
+@file:Suppress("NOTHING_TO_INLINE", "PARAMETER_NAME_CHANGED_ON_OVERRIDE", "LocalVariableName", "UNUSED_EXPRESSION")
package io.dico.parcels2.storage.exposed
import com.zaxxer.hikari.HikariDataSource
import io.dico.parcels2.*
+import io.dico.parcels2.storage.AddedDataPair
import io.dico.parcels2.storage.Backing
import io.dico.parcels2.storage.DataPair
+import io.dico.parcels2.util.synchronized
import io.dico.parcels2.util.toUUID
-import kotlinx.coroutines.experimental.CoroutineStart
-import kotlinx.coroutines.experimental.Unconfined
+import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.channels.ArrayChannel
+import kotlinx.coroutines.experimental.channels.LinkedListChannel
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
-import kotlinx.coroutines.experimental.launch
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SchemaUtils.create
import org.jetbrains.exposed.sql.transactions.transaction
@@ -21,14 +24,44 @@ import javax.sql.DataSource
class ExposedDatabaseException(message: String? = null) : Exception(message)
-class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : Backing {
+class ExposedBacking(private val dataSourceFactory: () -> DataSource, val poolSize: Int) : Backing {
override val name get() = "Exposed"
+ override val dispatcher: ThreadPoolDispatcher = newFixedThreadPoolContext(poolSize, "Parcels StorageThread")
+
private var dataSource: DataSource? = null
private var database: Database? = null
private var isShutdown: Boolean = false
-
override val isConnected get() = database != null
+ override fun launchJob(job: Backing.() -> Unit): Job = launch(dispatcher) { transaction { job() } }
+ override fun <T> launchFuture(future: Backing.() -> T): Deferred<T> = async(dispatcher) { transaction { future() } }
+
+ override fun <T> openChannel(future: Backing.(SendChannel<T>) -> Unit): ReceiveChannel<T> {
+ val channel = LinkedListChannel<T>()
+ launchJob { future(channel) }
+ return channel
+ }
+
+ override fun <T> openChannelForWriting(action: Backing.(T) -> Unit): SendChannel<T> {
+ val channel = ArrayChannel<T>(poolSize * 2)
+
+ repeat(poolSize) {
+ launch(dispatcher) {
+ try {
+ while (true) {
+ action(channel.receive())
+ }
+ } catch (ex: Exception) {
+ // channel closed
+ }
+ }
+ }
+
+ return channel
+ }
+
+ private fun <T> transaction(statement: Transaction.() -> T) = transaction(database!!, statement)
+
companion object {
init {
Database.registerDialect("mariadb") {
@@ -37,63 +70,85 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
}
}
- private fun <T> transaction(statement: Transaction.() -> T) = transaction(database!!, statement)
-
- private suspend fun transactionLaunch(statement: suspend Transaction.() -> Unit): Unit = transaction(database!!) {
- launch(context = Unconfined, start = CoroutineStart.UNDISPATCHED) {
- statement(this@transaction)
+ override fun init() {
+ synchronized {
+ if (isShutdown || isConnected) throw IllegalStateException()
+ dataSource = dataSourceFactory()
+ database = Database.connect(dataSource!!)
+ transaction(database!!) {
+ create(WorldsT, ProfilesT, ParcelsT, ParcelOptionsT, AddedLocalT, AddedGlobalT)
+ }
}
}
- override suspend fun init() {
- if (isShutdown) throw IllegalStateException()
- dataSource = dataSourceFactory()
- database = Database.connect(dataSource!!)
- transaction(database) {
- create(WorldsT, OwnersT, ParcelsT, ParcelOptionsT, AddedLocalT, AddedGlobalT)
+ override fun shutdown() {
+ synchronized {
+ if (isShutdown) throw IllegalStateException()
+ dataSource?.let {
+ (it as? HikariDataSource)?.close()
+ }
+ database = null
+ isShutdown = true
}
}
- override suspend fun shutdown() {
- if (isShutdown) throw IllegalStateException()
- dataSource?.let {
- (it as? HikariDataSource)?.close()
- }
- database = null
- isShutdown = true
+ private fun PlayerProfile.toOwnerProfile(): PlayerProfile {
+ if (this is PlayerProfile.Star) return PlayerProfile.Fake(PlayerProfile.Star.name)
+ return this
+ }
+
+ private fun PlayerProfile.Unresolved.toResolvedProfile(): PlayerProfile.Real {
+ return resolve(getPlayerUuidForName(name) ?: throwException())
+ }
+
+ private fun PlayerProfile.toResolvedProfile(): PlayerProfile {
+ if (this is PlayerProfile.Unresolved) return toResolvedProfile()
+ return this
+ }
+
+ private fun PlayerProfile.toRealProfile(): PlayerProfile.Real = when (this) {
+ is PlayerProfile.Real -> this
+ is PlayerProfile.Fake -> throw IllegalArgumentException("Fake profiles are not accepted")
+ is PlayerProfile.Unresolved -> toResolvedProfile()
+ else -> throw InternalError("Case should not be reached")
}
- override suspend fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) {
+ override fun getPlayerUuidForName(name: String): UUID? {
+ return ProfilesT.slice(ProfilesT.uuid).select { ProfilesT.name.upperCase() eq name.toUpperCase() }
+ .firstOrNull()?.let { it[ProfilesT.uuid]?.toUUID() }
+ }
+
+ override fun transmitParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) {
for (parcel in parcels) {
val data = readParcelData(parcel)
- channel.send(parcel to data)
+ channel.offer(parcel to data)
}
channel.close()
}
- override suspend fun produceAllParcelData(channel: SendChannel<Pair<ParcelId, ParcelData?>>) = transactionLaunch {
+ override fun transmitAllParcelData(channel: SendChannel<DataPair>) {
ParcelsT.selectAll().forEach { row ->
- val parcel = ParcelsT.getId(row) ?: return@forEach
+ val parcel = ParcelsT.getItem(row) ?: return@forEach
val data = rowToParcelData(row)
- channel.send(parcel to data)
+ channel.offer(parcel to data)
}
channel.close()
}
- override suspend fun readParcelData(parcel: ParcelId): ParcelData? = transaction {
- val row = ParcelsT.getRow(parcel) ?: return@transaction null
- rowToParcelData(row)
+ override fun readParcelData(parcel: ParcelId): ParcelData? {
+ val row = ParcelsT.getRow(parcel) ?: return null
+ return rowToParcelData(row)
}
- override suspend fun getOwnedParcels(user: ParcelOwner): List<ParcelId> = transaction {
- val user_id = OwnersT.getId(user) ?: return@transaction emptyList()
- ParcelsT.select { ParcelsT.owner_id eq user_id }
+ override fun getOwnedParcels(user: PlayerProfile): List<ParcelId> {
+ val user_id = ProfilesT.getId(user.toOwnerProfile()) ?: return emptyList()
+ return ParcelsT.select { ParcelsT.owner_id eq user_id }
.orderBy(ParcelsT.claim_time, isAsc = true)
- .mapNotNull(ParcelsT::getId)
+ .mapNotNull(ParcelsT::getItem)
.toList()
}
- override suspend fun setParcelData(parcel: ParcelId, data: ParcelData?) {
+ override fun setParcelData(parcel: ParcelId, data: ParcelData?) {
if (data == null) {
transaction {
ParcelsT.getId(parcel)?.let { id ->
@@ -117,21 +172,21 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
setParcelOwner(parcel, data.owner)
- for ((uuid, status) in data.addedMap) {
- setLocalPlayerStatus(parcel, uuid, status)
+ for ((profile, status) in data.addedMap) {
+ AddedLocalT.setPlayerStatus(parcel, profile, status)
}
setParcelAllowsInteractInputs(parcel, data.allowInteractInputs)
setParcelAllowsInteractInventory(parcel, data.allowInteractInventory)
}
- override suspend fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = transaction {
+ override fun setParcelOwner(parcel: ParcelId, owner: PlayerProfile?) {
val id = if (owner == null)
- ParcelsT.getId(parcel) ?: return@transaction
+ ParcelsT.getId(parcel) ?: return
else
ParcelsT.getOrInitId(parcel)
- val owner_id = owner?.let { OwnersT.getOrInitId(it) }
+ val owner_id = owner?.let { ProfilesT.getOrInitId(it.toOwnerProfile()) }
val time = owner?.let { DateTime.now() }
ParcelsT.update({ ParcelsT.id eq id }) {
@@ -140,11 +195,11 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
}
}
- override suspend fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = transaction {
- AddedLocalT.setPlayerStatus(parcel, player, status)
+ override fun setLocalPlayerStatus(parcel: ParcelId, player: PlayerProfile, status: AddedStatus) {
+ AddedLocalT.setPlayerStatus(parcel, player.toRealProfile(), status)
}
- override suspend fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean): Unit = transaction {
+ override fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean) {
val id = ParcelsT.getOrInitId(parcel)
ParcelOptionsT.upsert(ParcelOptionsT.parcel_id) {
it[ParcelOptionsT.parcel_id] = id
@@ -152,7 +207,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
}
}
- override suspend fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean): Unit = transaction {
+ override fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean) {
val id = ParcelsT.getOrInitId(parcel)
ParcelOptionsT.upsert(ParcelOptionsT.parcel_id) {
it[ParcelOptionsT.parcel_id] = id
@@ -160,36 +215,30 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) :
}
}
- override suspend fun produceAllGlobalAddedData(channel: SendChannel<Pair<ParcelOwner, MutableMap<UUID, AddedStatus>>>) = transactionLaunch {
+ override fun transmitAllGlobalAddedData(channel: SendChannel<AddedDataPair<PlayerProfile>>) {
AddedGlobalT.sendAllAddedData(channel)
channel.close()
}
- override suspend fun readGlobalAddedData(owner: ParcelOwner): MutableMap<UUID, AddedStatus> = transaction {
- return@transaction AddedGlobalT.readAddedData(OwnersT.getId(owner) ?: return@transaction hashMapOf())
+ override fun readGlobalAddedData(owner: PlayerProfile): MutableAddedDataMap {
+ return AddedGlobalT.readAddedData(ProfilesT.getId(owner.toOwnerProfile()) ?: return hashMapOf())
}
- override suspend fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = transaction {
- AddedGlobalT.setPlayerStatus(owner, player, status)
+ override fun setGlobalPlayerStatus(owner: PlayerProfile, player: PlayerProfile, status: AddedStatus) {
+ AddedGlobalT.setPlayerStatus(owner, player.toRealProfile(), status)
}
private fun rowToParcelData(row: ResultRow) = ParcelDataHolder().apply {
- owner = row[ParcelsT.owner_id]?.let { OwnersT.getId(it) }
+ owner = row[ParcelsT.owner_id]?.let { ProfilesT.getItem(it) }
since = row[ParcelsT.claim_time]
- val parcelId = row[ParcelsT.id]
- addedMap = AddedLocalT.readAddedData(parcelId)
-
- AddedLocalT.select { AddedLocalT.attach_id eq parcelId }.forEach {
- val uuid = it[AddedLocalT.player_uuid].toUUID()
- val status = if (it[AddedLocalT.allowed_flag]) AddedStatus.ALLOWED else AddedStatus.BANNED
- setAddedStatus(uuid, status)
+ val id = row[ParcelsT.id]
+ ParcelOptionsT.select { ParcelOptionsT.parcel_id eq id }.firstOrNull()?.let { optrow ->
+ allowInteractInputs = optrow[ParcelOptionsT.interact_inputs]
+ allowInteractInventory = optrow[ParcelOptionsT.interact_inventory]
}
- ParcelOptionsT.select { ParcelOptionsT.parcel_id eq parcelId }.firstOrNull()?.let {
- allowInteractInputs = it[ParcelOptionsT.interact_inputs]
- allowInteractInventory = it[ParcelOptionsT.interact_inventory]
- }
+ addedMap = AddedLocalT.readAddedData(id)
}
}