summaryrefslogtreecommitdiff
path: root/src/main/kotlin/io/dico/parcels2/storage/exposed/CoroutineTransactionManager.kt
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/kotlin/io/dico/parcels2/storage/exposed/CoroutineTransactionManager.kt')
-rw-r--r--src/main/kotlin/io/dico/parcels2/storage/exposed/CoroutineTransactionManager.kt118
1 files changed, 0 insertions, 118 deletions
diff --git a/src/main/kotlin/io/dico/parcels2/storage/exposed/CoroutineTransactionManager.kt b/src/main/kotlin/io/dico/parcels2/storage/exposed/CoroutineTransactionManager.kt
deleted file mode 100644
index ab707af..0000000
--- a/src/main/kotlin/io/dico/parcels2/storage/exposed/CoroutineTransactionManager.kt
+++ /dev/null
@@ -1,118 +0,0 @@
-package io.dico.parcels2.storage.exposed
-
-import kotlinx.coroutines.experimental.*
-import org.jetbrains.exposed.sql.*
-import org.jetbrains.exposed.sql.statements.StatementContext
-import org.jetbrains.exposed.sql.statements.StatementInterceptor
-import org.jetbrains.exposed.sql.statements.expandArgs
-import org.jetbrains.exposed.sql.transactions.*
-import org.slf4j.LoggerFactory
-import java.sql.Connection
-import kotlin.coroutines.experimental.CoroutineContext
-
-fun <T> ctransaction(db: Database? = null, statement: suspend Transaction.() -> T): T {
- return ctransaction(TransactionManager.manager.defaultIsolationLevel, 3, db, statement)
-}
-
-fun <T> ctransaction(transactionIsolation: Int, repetitionAttempts: Int, db: Database? = null, statement: suspend Transaction.() -> T): T {
- return transaction(transactionIsolation, repetitionAttempts, db) {
- if (this !is CoroutineTransaction) throw IllegalStateException("ctransaction requires CoroutineTransactionManager.")
-
- val job = async(context = manager.context, start = CoroutineStart.UNDISPATCHED) {
- this@transaction.statement()
- }
-
- if (job.isActive) {
- runBlocking(context = Unconfined) {
- job.join()
- }
- }
-
- job.getCompleted()
- }
-}
-
-class CoroutineTransactionManager(private val db: Database,
- dispatcher: CoroutineDispatcher,
- override var defaultIsolationLevel: Int = DEFAULT_ISOLATION_LEVEL) : TransactionManager {
- val context: CoroutineDispatcher = TransactionCoroutineDispatcher(dispatcher)
- private val transaction = ThreadLocal<CoroutineTransaction?>()
-
- override fun currentOrNull(): Transaction? {
-
-
- return transaction.get()
- ?: null
- }
-
- override fun newTransaction(isolation: Int): Transaction {
- return CoroutineTransaction(this, CoroutineTransactionInterface(db, isolation, transaction)).also { transaction.set(it) }
- }
-
- private inner class TransactionCoroutineDispatcher(val delegate: CoroutineDispatcher) : CoroutineDispatcher() {
-
- // When the thread changes, move the transaction to the new thread
- override fun dispatch(context: CoroutineContext, block: Runnable) {
- val existing = transaction.get()
-
- val newContext: CoroutineContext
- if (existing != null) {
- transaction.set(null)
- newContext = context // + existing
- } else {
- newContext = context
- }
-
- delegate.dispatch(newContext, Runnable {
- if (existing != null) {
- transaction.set(existing)
- }
-
- block.run()
- })
- }
-
- }
-
-}
-
-private class CoroutineTransaction(val manager: CoroutineTransactionManager,
- itf: CoroutineTransactionInterface) : Transaction(itf), CoroutineContext.Element {
- companion object Key : CoroutineContext.Key<CoroutineTransaction>
-
- override val key: CoroutineContext.Key<CoroutineTransaction> = Key
-}
-
-private class CoroutineTransactionInterface(override val db: Database, isolation: Int, val threadLocal: ThreadLocal<CoroutineTransaction?>) : TransactionInterface {
- private val connectionLazy = lazy(LazyThreadSafetyMode.NONE) {
- db.connector().apply {
- autoCommit = false
- transactionIsolation = isolation
- }
- }
- override val connection: Connection
- get() = connectionLazy.value
-
- override val outerTransaction: CoroutineTransaction? = threadLocal.get()
-
- override fun commit() {
- if (connectionLazy.isInitialized())
- connection.commit()
- }
-
- override fun rollback() {
- if (connectionLazy.isInitialized() && !connection.isClosed) {
- connection.rollback()
- }
- }
-
- override fun close() {
- try {
- if (connectionLazy.isInitialized()) connection.close()
- } finally {
- threadLocal.set(outerTransaction)
- }
- }
-
-}
-