-
Notifications
You must be signed in to change notification settings - Fork 697
/
Copy pathBatchInsertStatement.kt
105 lines (86 loc) · 4.61 KB
/
BatchInsertStatement.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package org.jetbrains.exposed.sql.statements
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.TransactionManager
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.util.*
internal class BatchDataInconsistentException(message : String) : Exception(message)
open class BatchInsertStatement(table: Table, ignore: Boolean = false): InsertStatement<List<Map<Column<*>, Any>>>(table, ignore) {
override val flushCache: Boolean = false
override val isAlwaysBatch = true
override val generatedKey: List<Map<Column<*>, Any>>?
get() = resultedValues
protected val data = ArrayList<MutableMap<Column<*>, Any?>>()
private fun Column<*>.isDefaultable() = columnType.nullable || defaultValueFun != null
override operator fun <S> set(column: Column<S>, value: S) {
if (data.size > 1 && column !in data[data.size - 2] && !column.isDefaultable()) {
throw BatchDataInconsistentException("Can't set $value for ${TransactionManager.current().fullIdentity(column)} because previous insertion can't be defaulted for that column.")
}
super.set(column, value)
}
fun addBatch() {
if (data.isNotEmpty()) {
validateLastBatch()
data[data.size - 1] = LinkedHashMap(values)
values.clear()
}
data.add(values)
arguments = null
}
internal open fun validateLastBatch() {
val cantBeDefaulted = (data.last().keys - values.keys).filterNot { it.isDefaultable() }
if (cantBeDefaulted.isNotEmpty()) {
val columnList = cantBeDefaulted.joinToString { TransactionManager.current().fullIdentity(it) }
throw BatchDataInconsistentException("Can't add new batch because columns: $columnList don't have client default values. DB defaults don't support in batch inserts")
}
val requiredInTargets = (targets.flatMap { it.columns } - values.keys).filter { !it.isDefaultable() && !it.columnType.isAutoInc }
if (requiredInTargets.any()) {
throw BatchDataInconsistentException("Can't add new batch because columns: ${requiredInTargets.joinToString()} don't have client default values. DB defaults don't support in batch inserts")
}
}
private fun allColumnsInDataSet() = data.fold(setOf<Column<*>>()) { columns, row ->
columns + row.keys
}
override var arguments: List<List<Pair<Column<*>, Any?>>>? = null
get() = field ?: run {
val nullableColumns = allColumnsInDataSet().filter { it.columnType.nullable }
data.map { single ->
val valuesAndDefaults = super.valuesAndDefaults(single)
(valuesAndDefaults + (nullableColumns - valuesAndDefaults.keys).associate { it to null }).toList().sortedBy { it.first }
}.apply { field = this }
}
override fun valuesAndDefaults(values: Map<Column<*>, Any?>) = arguments!!.first().toMap()
}
open class SQLServerBatchInsertStatement(table: Table, ignore: Boolean = false) : BatchInsertStatement(table, ignore) {
override val isAlwaysBatch: Boolean = false
private val OUTPUT_ROW_LIMIT = 1000
private val OUTPUT_PARAMS_LIMIT = 5000
override fun validateLastBatch() {
super.validateLastBatch()
if (data.size > OUTPUT_ROW_LIMIT) {
throw BatchDataInconsistentException("Too much rows in one batch. Exceed $OUTPUT_ROW_LIMIT limit")
}
val paramsToInsert = data.firstOrNull()?.size ?: 0
if (paramsToInsert * (data.size + 1) > OUTPUT_PARAMS_LIMIT) {
throw BatchDataInconsistentException("Too much parameters for batch with OUTPUT. Exceed $OUTPUT_PARAMS_LIMIT limit")
}
}
override fun prepareSQL(transaction: Transaction): String {
val values = arguments!!
val sql = if (values.isEmpty()) ""
else {
val builder = QueryBuilder(true)
val output = table.autoIncColumn?.let { " OUTPUT inserted.${transaction.identity(it)} AS GENERATED_KEYS" }.orEmpty()
values.joinToString(prefix = "$output VALUES") {
it.joinToString(prefix = "(", postfix = ")") { (col, value) ->
builder.registerArgument(col, value)
}
}
}
return transaction.db.dialect.functionProvider.insert(isIgnore, table, values.firstOrNull()?.map { it.first }.orEmpty(), sql, transaction)
}
override fun arguments() = listOfNotNull(super.arguments().flatten().takeIf { data.isNotEmpty() })
override fun PreparedStatement.execInsertFunction(): Pair<Int, ResultSet?> {
return arguments!!.size to executeQuery()
}
}