diff options
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt')
-rw-r--r-- | src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt | 175 |
1 files changed, 112 insertions, 63 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt index 5685346..8ea6653 100644 --- a/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt +++ b/src/main/kotlin/io/dico/parcels2/storage/exposed/ExposedBacking.kt @@ -1,16 +1,19 @@ -@file:Suppress("NOTHING_TO_INLINE", "PARAMETER_NAME_CHANGED_ON_OVERRIDE", "LocalVariableName") +@file:Suppress("NOTHING_TO_INLINE", "PARAMETER_NAME_CHANGED_ON_OVERRIDE", "LocalVariableName", "UNUSED_EXPRESSION") package io.dico.parcels2.storage.exposed import com.zaxxer.hikari.HikariDataSource import io.dico.parcels2.* +import io.dico.parcels2.storage.AddedDataPair import io.dico.parcels2.storage.Backing import io.dico.parcels2.storage.DataPair +import io.dico.parcels2.util.synchronized import io.dico.parcels2.util.toUUID -import kotlinx.coroutines.experimental.CoroutineStart -import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.channels.ArrayChannel +import kotlinx.coroutines.experimental.channels.LinkedListChannel +import kotlinx.coroutines.experimental.channels.ReceiveChannel import kotlinx.coroutines.experimental.channels.SendChannel -import kotlinx.coroutines.experimental.launch import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.SchemaUtils.create import org.jetbrains.exposed.sql.transactions.transaction @@ -21,14 +24,44 @@ import javax.sql.DataSource class ExposedDatabaseException(message: String? = null) : Exception(message) -class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : Backing { +class ExposedBacking(private val dataSourceFactory: () -> DataSource, val poolSize: Int) : Backing { override val name get() = "Exposed" + override val dispatcher: ThreadPoolDispatcher = newFixedThreadPoolContext(poolSize, "Parcels StorageThread") + private var dataSource: DataSource? = null private var database: Database? = null private var isShutdown: Boolean = false - override val isConnected get() = database != null + override fun launchJob(job: Backing.() -> Unit): Job = launch(dispatcher) { transaction { job() } } + override fun <T> launchFuture(future: Backing.() -> T): Deferred<T> = async(dispatcher) { transaction { future() } } + + override fun <T> openChannel(future: Backing.(SendChannel<T>) -> Unit): ReceiveChannel<T> { + val channel = LinkedListChannel<T>() + launchJob { future(channel) } + return channel + } + + override fun <T> openChannelForWriting(action: Backing.(T) -> Unit): SendChannel<T> { + val channel = ArrayChannel<T>(poolSize * 2) + + repeat(poolSize) { + launch(dispatcher) { + try { + while (true) { + action(channel.receive()) + } + } catch (ex: Exception) { + // channel closed + } + } + } + + return channel + } + + private fun <T> transaction(statement: Transaction.() -> T) = transaction(database!!, statement) + companion object { init { Database.registerDialect("mariadb") { @@ -37,63 +70,85 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : } } - private fun <T> transaction(statement: Transaction.() -> T) = transaction(database!!, statement) - - private suspend fun transactionLaunch(statement: suspend Transaction.() -> Unit): Unit = transaction(database!!) { - launch(context = Unconfined, start = CoroutineStart.UNDISPATCHED) { - statement(this@transaction) + override fun init() { + synchronized { + if (isShutdown || isConnected) throw IllegalStateException() + dataSource = dataSourceFactory() + database = Database.connect(dataSource!!) + transaction(database!!) { + create(WorldsT, ProfilesT, ParcelsT, ParcelOptionsT, AddedLocalT, AddedGlobalT) + } } } - override suspend fun init() { - if (isShutdown) throw IllegalStateException() - dataSource = dataSourceFactory() - database = Database.connect(dataSource!!) - transaction(database) { - create(WorldsT, OwnersT, ParcelsT, ParcelOptionsT, AddedLocalT, AddedGlobalT) + override fun shutdown() { + synchronized { + if (isShutdown) throw IllegalStateException() + dataSource?.let { + (it as? HikariDataSource)?.close() + } + database = null + isShutdown = true } } - override suspend fun shutdown() { - if (isShutdown) throw IllegalStateException() - dataSource?.let { - (it as? HikariDataSource)?.close() - } - database = null - isShutdown = true + private fun PlayerProfile.toOwnerProfile(): PlayerProfile { + if (this is PlayerProfile.Star) return PlayerProfile.Fake(PlayerProfile.Star.name) + return this + } + + private fun PlayerProfile.Unresolved.toResolvedProfile(): PlayerProfile.Real { + return resolve(getPlayerUuidForName(name) ?: throwException()) + } + + private fun PlayerProfile.toResolvedProfile(): PlayerProfile { + if (this is PlayerProfile.Unresolved) return toResolvedProfile() + return this + } + + private fun PlayerProfile.toRealProfile(): PlayerProfile.Real = when (this) { + is PlayerProfile.Real -> this + is PlayerProfile.Fake -> throw IllegalArgumentException("Fake profiles are not accepted") + is PlayerProfile.Unresolved -> toResolvedProfile() + else -> throw InternalError("Case should not be reached") } - override suspend fun produceParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) { + override fun getPlayerUuidForName(name: String): UUID? { + return ProfilesT.slice(ProfilesT.uuid).select { ProfilesT.name.upperCase() eq name.toUpperCase() } + .firstOrNull()?.let { it[ProfilesT.uuid]?.toUUID() } + } + + override fun transmitParcelData(channel: SendChannel<DataPair>, parcels: Sequence<ParcelId>) { for (parcel in parcels) { val data = readParcelData(parcel) - channel.send(parcel to data) + channel.offer(parcel to data) } channel.close() } - override suspend fun produceAllParcelData(channel: SendChannel<Pair<ParcelId, ParcelData?>>) = transactionLaunch { + override fun transmitAllParcelData(channel: SendChannel<DataPair>) { ParcelsT.selectAll().forEach { row -> - val parcel = ParcelsT.getId(row) ?: return@forEach + val parcel = ParcelsT.getItem(row) ?: return@forEach val data = rowToParcelData(row) - channel.send(parcel to data) + channel.offer(parcel to data) } channel.close() } - override suspend fun readParcelData(parcel: ParcelId): ParcelData? = transaction { - val row = ParcelsT.getRow(parcel) ?: return@transaction null - rowToParcelData(row) + override fun readParcelData(parcel: ParcelId): ParcelData? { + val row = ParcelsT.getRow(parcel) ?: return null + return rowToParcelData(row) } - override suspend fun getOwnedParcels(user: ParcelOwner): List<ParcelId> = transaction { - val user_id = OwnersT.getId(user) ?: return@transaction emptyList() - ParcelsT.select { ParcelsT.owner_id eq user_id } + override fun getOwnedParcels(user: PlayerProfile): List<ParcelId> { + val user_id = ProfilesT.getId(user.toOwnerProfile()) ?: return emptyList() + return ParcelsT.select { ParcelsT.owner_id eq user_id } .orderBy(ParcelsT.claim_time, isAsc = true) - .mapNotNull(ParcelsT::getId) + .mapNotNull(ParcelsT::getItem) .toList() } - override suspend fun setParcelData(parcel: ParcelId, data: ParcelData?) { + override fun setParcelData(parcel: ParcelId, data: ParcelData?) { if (data == null) { transaction { ParcelsT.getId(parcel)?.let { id -> @@ -117,21 +172,21 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : setParcelOwner(parcel, data.owner) - for ((uuid, status) in data.addedMap) { - setLocalPlayerStatus(parcel, uuid, status) + for ((profile, status) in data.addedMap) { + AddedLocalT.setPlayerStatus(parcel, profile, status) } setParcelAllowsInteractInputs(parcel, data.allowInteractInputs) setParcelAllowsInteractInventory(parcel, data.allowInteractInventory) } - override suspend fun setParcelOwner(parcel: ParcelId, owner: ParcelOwner?) = transaction { + override fun setParcelOwner(parcel: ParcelId, owner: PlayerProfile?) { val id = if (owner == null) - ParcelsT.getId(parcel) ?: return@transaction + ParcelsT.getId(parcel) ?: return else ParcelsT.getOrInitId(parcel) - val owner_id = owner?.let { OwnersT.getOrInitId(it) } + val owner_id = owner?.let { ProfilesT.getOrInitId(it.toOwnerProfile()) } val time = owner?.let { DateTime.now() } ParcelsT.update({ ParcelsT.id eq id }) { @@ -140,11 +195,11 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : } } - override suspend fun setLocalPlayerStatus(parcel: ParcelId, player: UUID, status: AddedStatus) = transaction { - AddedLocalT.setPlayerStatus(parcel, player, status) + override fun setLocalPlayerStatus(parcel: ParcelId, player: PlayerProfile, status: AddedStatus) { + AddedLocalT.setPlayerStatus(parcel, player.toRealProfile(), status) } - override suspend fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean): Unit = transaction { + override fun setParcelAllowsInteractInventory(parcel: ParcelId, value: Boolean) { val id = ParcelsT.getOrInitId(parcel) ParcelOptionsT.upsert(ParcelOptionsT.parcel_id) { it[ParcelOptionsT.parcel_id] = id @@ -152,7 +207,7 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : } } - override suspend fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean): Unit = transaction { + override fun setParcelAllowsInteractInputs(parcel: ParcelId, value: Boolean) { val id = ParcelsT.getOrInitId(parcel) ParcelOptionsT.upsert(ParcelOptionsT.parcel_id) { it[ParcelOptionsT.parcel_id] = id @@ -160,36 +215,30 @@ class ExposedBacking(private val dataSourceFactory: suspend () -> DataSource) : } } - override suspend fun produceAllGlobalAddedData(channel: SendChannel<Pair<ParcelOwner, MutableMap<UUID, AddedStatus>>>) = transactionLaunch { + override fun transmitAllGlobalAddedData(channel: SendChannel<AddedDataPair<PlayerProfile>>) { AddedGlobalT.sendAllAddedData(channel) channel.close() } - override suspend fun readGlobalAddedData(owner: ParcelOwner): MutableMap<UUID, AddedStatus> = transaction { - return@transaction AddedGlobalT.readAddedData(OwnersT.getId(owner) ?: return@transaction hashMapOf()) + override fun readGlobalAddedData(owner: PlayerProfile): MutableAddedDataMap { + return AddedGlobalT.readAddedData(ProfilesT.getId(owner.toOwnerProfile()) ?: return hashMapOf()) } - override suspend fun setGlobalPlayerStatus(owner: ParcelOwner, player: UUID, status: AddedStatus) = transaction { - AddedGlobalT.setPlayerStatus(owner, player, status) + override fun setGlobalPlayerStatus(owner: PlayerProfile, player: PlayerProfile, status: AddedStatus) { + AddedGlobalT.setPlayerStatus(owner, player.toRealProfile(), status) } private fun rowToParcelData(row: ResultRow) = ParcelDataHolder().apply { - owner = row[ParcelsT.owner_id]?.let { OwnersT.getId(it) } + owner = row[ParcelsT.owner_id]?.let { ProfilesT.getItem(it) } since = row[ParcelsT.claim_time] - val parcelId = row[ParcelsT.id] - addedMap = AddedLocalT.readAddedData(parcelId) - - AddedLocalT.select { AddedLocalT.attach_id eq parcelId }.forEach { - val uuid = it[AddedLocalT.player_uuid].toUUID() - val status = if (it[AddedLocalT.allowed_flag]) AddedStatus.ALLOWED else AddedStatus.BANNED - setAddedStatus(uuid, status) + val id = row[ParcelsT.id] + ParcelOptionsT.select { ParcelOptionsT.parcel_id eq id }.firstOrNull()?.let { optrow -> + allowInteractInputs = optrow[ParcelOptionsT.interact_inputs] + allowInteractInventory = optrow[ParcelOptionsT.interact_inventory] } - ParcelOptionsT.select { ParcelOptionsT.parcel_id eq parcelId }.firstOrNull()?.let { - allowInteractInputs = it[ParcelOptionsT.interact_inputs] - allowInteractInventory = it[ParcelOptionsT.interact_inventory] - } + addedMap = AddedLocalT.readAddedData(id) } } |