forked from edgexfoundry/edgex-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.go
144 lines (122 loc) · 3.61 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
//
// Copyright (C) 2024 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0
package postgres
import (
"context"
goErrors "errors"
"fmt"
"os"
"path/filepath"
"regexp"
"sort"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/edgexfoundry/go-mod-core-contracts/v3/errors"
)
var sqlFileNameRegexp = regexp.MustCompile(`([[:digit:]]+)-[[:word:]]+.sql`)
type sqlFileName struct {
order int
name string
}
type sqlFileNames []sqlFileName
func (sf sqlFileNames) Len() int {
return len(sf)
}
func (sf sqlFileNames) Less(i, j int) bool {
return sf[i].order < sf[j].order
}
func (sf sqlFileNames) Swap(i, j int) {
sf[i], sf[j] = sf[j], sf[i]
}
func (sf sqlFileNames) getSortedNames() []string {
sort.Sort(sf)
result := make([]string, len(sf))
for i, f := range sf {
result[i] = f.name
}
return result
}
func executeDBScripts(ctx context.Context, connPool *pgxpool.Pool, scriptsPath string) errors.EdgeX {
if len(scriptsPath) == 0 {
// skip script execution when the path is empty
return nil
}
// get the sorted sql files
sqlFiles, edgeXerr := sortedSqlFileNames(scriptsPath)
if edgeXerr != nil {
return edgeXerr
}
tx, err := connPool.Begin(ctx)
if err != nil {
return WrapDBError("failed to begin a transaction to execute sql files", err)
}
defer func() {
_ = tx.Rollback(ctx)
}()
// execute sql files in the sequence of ordering prefix as a transaction
for _, sqlFile := range sqlFiles {
// read sql file content
sqlContent, err := os.ReadFile(sqlFile)
if err != nil {
return errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("failed to read sql file %s", sqlFile), err)
}
_, err = tx.Exec(ctx, string(sqlContent))
if err != nil {
return WrapDBError(fmt.Sprintf("failed to execute sql file %s", sqlFile), err)
}
}
if err = tx.Commit(ctx); err != nil {
return WrapDBError("failed to commit transaction for executing sql files", err)
}
return nil
}
func sortedSqlFileNames(sqlFilesDir string) ([]string, errors.EdgeX) {
sqlDir, err := os.Open(sqlFilesDir)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("failed to open directory at %s", sqlFilesDir), err)
}
fileInfos, err := sqlDir.Readdir(0)
if err != nil {
return nil, errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("failed to read sql files from %s", sqlFilesDir), err)
}
sqlFiles := sqlFileNames{}
for _, file := range fileInfos {
// ignore directories
if file.IsDir() {
continue
}
fileName := file.Name()
// ignore files whose name is not in the format of %d-%s.sql
if !sqlFileNameRegexp.MatchString(fileName) {
continue
}
var order int
var fileNameWithoutOrder string
_, err = fmt.Sscanf(fileName, "%d-%s", &order, &fileNameWithoutOrder)
// ignore mal-format sql files
if err != nil {
continue
}
sqlFiles = append(sqlFiles, sqlFileName{order, filepath.Join(sqlFilesDir, fileName)})
}
return sqlFiles.getSortedNames(), nil
}
// When DB operation fails with error, the pgx DB library put a much detailed information in the pgxError.Detail.
func WrapDBError(message string, err error) errors.EdgeX {
var pgErr *pgconn.PgError
if goErrors.As(err, &pgErr) {
if pgerrcode.IsIntegrityConstraintViolation(pgErr.Code) {
var errMsg string
if message != "" {
errMsg = message + ": "
}
errMsg += pgErr.Detail
return errors.NewCommonEdgeX(errors.KindDuplicateName, errMsg, nil)
}
return errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("%s: %s %s", message, pgErr.Error(), pgErr.Detail), nil)
}
return errors.NewCommonEdgeX(errors.KindDatabaseError, message, err)
}