-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatch_upserter.go
132 lines (118 loc) · 3.77 KB
/
batch_upserter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package dynamodb
import (
"context"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"log"
"reflect"
)
type BatchUpserter struct {
DB *dynamodb.DynamoDB
tableName string
Map func(ctx context.Context, model interface{}) (interface{}, error)
keys []string
}
func NewBatchUpserterById(database *dynamodb.DynamoDB, tableName string, modelType reflect.Type, fieldName string, keys []string, options ...func(context.Context, interface{}) (interface{}, error)) *BatchUpserter {
var mp func(context.Context, interface{}) (interface{}, error)
if len(options) >= 1 {
mp = options[0]
}
if len(fieldName) == 0 {
_, idName, _ := FindIdField(modelType)
fieldName = idName
}
return &BatchUpserter{Map: mp,DB: database,tableName: tableName, keys: keys}
}
func NewBatchUpserter(database *dynamodb.DynamoDB, tableName string, modelType reflect.Type, keys []string, options ...func(context.Context, interface{}) (interface{}, error)) *BatchUpserter {
return NewBatchUpserterById(database, tableName, modelType, "", keys, options...)
}
func (w *BatchUpserter) Write(ctx context.Context, models interface{}) ([]int, []int, error) {
successIndices := make([]int, 0)
failIndices := make([]int, 0)
s := reflect.ValueOf(models)
var er1 error
if w.Map != nil {
m2, er0 := w.Map(ctx, models)
if er0 != nil {
return successIndices, failIndices, er0
}
_, _, er1 = UpsertMany(ctx, w.DB, w.tableName, w.keys, m2)
} else {
_, _, er1 = UpsertMany(ctx, w.DB, w.tableName, w.keys, models)
}
if er1 == nil {
for i := 0; i < s.Len(); i++ {
successIndices = append(successIndices, i)
}
return successIndices, failIndices, er1
}
return successIndices, failIndices, er1
}
func UpsertMany(ctx context.Context, db *dynamodb.DynamoDB, tableName string, keys []string, models interface{}) (interface{}, interface{}, error) {
arr := make([]interface{}, 0)
modelsType := reflect.TypeOf(models)
insertedFails := reflect.New(modelsType).Interface()
switch reflect.TypeOf(models).Kind() {
case reflect.Slice:
values := reflect.ValueOf(models)
if values.Len() == 0 {
return insertedFails, insertedFails, nil
}
for i := 0; i < values.Len(); i++ {
arr = append(arr, values.Index(i).Interface())
}
}
rs, err := TransactionUpsert(ctx, db, arr, tableName, keys)
if err != nil {
return nil, nil, err
}
log.Println(rs)
return nil, nil, err
}
func TransactionUpsert(ctx context.Context, db *dynamodb.DynamoDB, data []interface{}, tableName string, keys []string) (*dynamodb.TransactWriteItemsOutput, error) {
var listTransaction = make([]*dynamodb.TransactWriteItem, 0)
for _, d := range data {
ids := getIdValueFromModel(d, keys)
keyMap, err := buildKeyMap(keys, ids)
if err != nil {
return nil, err
}
av, _ := dynamodbattribute.MarshalMap(d)
isExit, err := Exist(ctx, db, tableName, keys, ids)
if !isExit {
put := &dynamodb.Put{
Item: av,
TableName: &tableName,
}
transaction := &dynamodb.TransactWriteItem{
Put: put,
}
listTransaction = append(listTransaction, transaction)
} else {
expressionValues, expressionNames, expression := BuildUpdate(av, keys)
update := &dynamodb.Update{
ExpressionAttributeNames: expressionNames,
ExpressionAttributeValues: expressionValues,
Key: keyMap,
TableName: &tableName,
UpdateExpression: &expression,
}
transaction := &dynamodb.TransactWriteItem{
Update: update,
}
listTransaction = append(listTransaction, transaction)
}
}
input := &dynamodb.TransactWriteItemsInput{
TransactItems: listTransaction,
}
err := input.Validate()
if err != nil {
return nil, err
}
rs, err := db.TransactWriteItems(input)
if err != nil {
return nil, err
}
return rs, nil
}