Skip to content

Commit 34ad8aa

Browse files
committed
feat: Add core-data event related db methods for postgres
Relates to edgexfoundry#4847. Add core-data event related db methods for postgres db client. Signed-off-by: Lindsey Cheng <[email protected]>
1 parent a62407a commit 34ad8aa

File tree

10 files changed

+429
-12
lines changed

10 files changed

+429
-12
lines changed

cmd/core-data/res/db/sql/00-utils.sql

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
--
2+
-- Copyright (C) 2024 IOTech Ltd
3+
--
4+
-- SPDX-License-Identifier: Apache-2.0
5+
6+
-- schema for core-data related tables
7+
CREATE SCHEMA IF NOT EXISTS "core-data";
+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
--
2+
-- Copyright (C) 2024 IOTech Ltd
3+
--
4+
-- SPDX-License-Identifier: Apache-2.0
5+
6+
-- core-data.event is used to store the event information
7+
CREATE TABLE IF NOT EXISTS "core-data".event (
8+
id UUID PRIMARY KEY,
9+
content JSONB NOT NULL,
10+
created timestamptz NOT NULL DEFAULT now()
11+
);

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/fxamacker/cbor/v2 v2.7.0
1313
github.com/gomodule/redigo v1.9.2
1414
github.com/google/uuid v1.6.0
15+
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
1516
github.com/jackc/pgx/v5 v5.6.0
1617
github.com/labstack/echo/v4 v4.11.4
1718
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
325325
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
326326
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
327327
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
328+
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0=
329+
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
328330
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
329331
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
330332
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=

internal/pkg/bootstrap/handlers/database.go

+18-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//
2-
// Copyright (C) 2020 IOTech Ltd
2+
// Copyright (C) 2020-2024 IOTech Ltd
33
//
44
// SPDX-License-Identifier: Apache-2.0
55

@@ -24,6 +24,12 @@ import (
2424
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger"
2525
)
2626

27+
const (
28+
baseScriptPath = "/res/db/sql"
29+
redisDBType = "redisdb"
30+
postgresDBType = "postgres"
31+
)
32+
2733
// httpServer defines the contract used to determine whether or not the http httpServer is running.
2834
type httpServer interface {
2935
IsRunning() bool
@@ -60,12 +66,12 @@ func (d Database) newDBClient(
6066
}
6167

6268
switch databaseInfo.Type {
63-
case "redisdb":
69+
case redisDBType:
6470
return redis.NewClient(databaseConfig, lc)
65-
case "postgres":
71+
case postgresDBType:
6672
databaseConfig.Username = credentials.Username
6773
// TODO: The baseScriptPath and extScriptPath should be passed in from the configuration file
68-
return postgres.NewClient(ctx, databaseConfig, "/res/db/sql", "", lc)
74+
return postgres.NewClient(ctx, databaseConfig, baseScriptPath, "", lc)
6975
default:
7076
return nil, db.ErrUnsupportedDatabase
7177
}
@@ -139,7 +145,13 @@ func (d Database) BootstrapHandler(
139145
},
140146
})
141147

142-
lc.Info("Database connected")
148+
var dbName string
149+
if dbInfo.Type == postgresDBType {
150+
dbName = "Postgres"
151+
} else {
152+
dbName = "Redis"
153+
}
154+
lc.Infof("%s Database connected", dbName)
143155
wg.Add(1)
144156
go func() {
145157
defer wg.Done()
@@ -153,7 +165,7 @@ func (d Database) BootstrapHandler(
153165
}
154166
time.Sleep(time.Second)
155167
}
156-
lc.Info("Database disconnected")
168+
lc.Infof("%s Database disconnected", dbName)
157169
}()
158170

159171
return true

internal/pkg/db/postgres/client.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,13 @@ func NewClient(ctx context.Context, config db.Configuration, baseScriptPath, ext
6363

6464
// execute base DB scripts
6565
if edgeXerr = executeDBScripts(ctx, dc.ConnPool, baseScriptPath); edgeXerr != nil {
66-
return nil, errors.NewCommonEdgeX(errors.Kind(edgeXerr), "failed to execute base DB scripts", edgeXerr)
66+
return nil, errors.NewCommonEdgeX(errors.Kind(edgeXerr), "failed to execute Postgres base DB scripts", edgeXerr)
6767
}
68-
lc.Info("successfully execute base DB scripts")
68+
lc.Info("successfully execute Postgres base DB scripts")
6969

7070
// execute extension DB scripts
7171
if edgeXerr = executeDBScripts(ctx, dc.ConnPool, extScriptPath); edgeXerr != nil {
72-
return nil, errors.NewCommonEdgeX(errors.Kind(edgeXerr), "failed to execute extension DB scripts", edgeXerr)
72+
return nil, errors.NewCommonEdgeX(errors.Kind(edgeXerr), "failed to execute Postgres extension DB scripts", edgeXerr)
7373
}
7474

7575
return dc, nil

internal/pkg/db/postgres/utils.go

+4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"regexp"
1515
"sort"
1616

17+
"github.com/jackc/pgerrcode"
1718
"github.com/jackc/pgx/v5/pgconn"
1819
"github.com/jackc/pgx/v5/pgxpool"
1920

@@ -128,6 +129,9 @@ func sortedSqlFileNames(sqlFilesDir string) ([]string, errors.EdgeX) {
128129
func WrapDBError(message string, err error) errors.EdgeX {
129130
var pgErr *pgconn.PgError
130131
if goErrors.As(err, &pgErr) {
132+
if pgerrcode.IsIntegrityConstraintViolation(pgErr.Code) {
133+
return errors.NewCommonEdgeX(errors.KindDuplicateName, pgErr.Detail, nil)
134+
}
131135
return errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("%s: %s %s", message, pgErr.Error(), pgErr.Detail), nil)
132136
}
133137
return errors.NewCommonEdgeX(errors.KindDatabaseError, message, err)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
//
2+
// Copyright (C) 2024 IOTech Ltd
3+
//
4+
// SPDX-License-Identifier: Apache-2.0
5+
6+
package postgres
7+
8+
import (
9+
"context"
10+
"encoding/json"
11+
"fmt"
12+
"time"
13+
14+
pgClient "github.com/edgexfoundry/edgex-go/internal/pkg/db/postgres"
15+
16+
"github.com/edgexfoundry/go-mod-core-contracts/v3/errors"
17+
model "github.com/edgexfoundry/go-mod-core-contracts/v3/models"
18+
19+
"github.com/jackc/pgx/v5"
20+
"github.com/jackc/pgx/v5/pgxpool"
21+
22+
"github.com/google/uuid"
23+
)
24+
25+
const (
26+
eventTableName = "\"core-data\".event"
27+
28+
// constants relate to the event struct field names
29+
deviceNameField = "DeviceName"
30+
originField = "Origin"
31+
)
32+
33+
// AllEvents queries the events with the given range, offset, and limit
34+
func (c *Client) AllEvents(offset, limit int) ([]model.Event, errors.EdgeX) {
35+
ctx := context.Background()
36+
37+
events, err := queryEvents(ctx, c.ConnPool, sqlQueryContentWithPagination(eventTableName), offset, limit)
38+
if err != nil {
39+
return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, "failed to query all events", err)
40+
}
41+
42+
return events, nil
43+
}
44+
45+
// AddEvent adds a new event model to DB
46+
func (c *Client) AddEvent(e model.Event) (model.Event, errors.EdgeX) {
47+
ctx := context.Background()
48+
49+
if e.Id == "" {
50+
e.Id = uuid.NewString()
51+
}
52+
event := model.Event{
53+
Id: e.Id,
54+
DeviceName: e.DeviceName,
55+
ProfileName: e.ProfileName,
56+
SourceName: e.SourceName,
57+
Origin: e.Origin,
58+
Tags: e.Tags,
59+
}
60+
eventBytes, err := json.Marshal(event)
61+
if err != nil {
62+
return model.Event{}, errors.NewCommonEdgeX(errors.KindContractInvalid, "unable to JSON marshal event model", err)
63+
}
64+
65+
_, err = c.ConnPool.Exec(
66+
ctx,
67+
sqlInsert(eventTableName, idCol, contentCol),
68+
e.Id,
69+
eventBytes,
70+
)
71+
if err != nil {
72+
//if pgClient.IsDupIdError(err) {
73+
// return model.Event{}, errors.NewCommonEdgeX(errors.KindDuplicateName, fmt.Sprintf("event id '%s' already exists", e.Id), err)
74+
//}
75+
return model.Event{}, pgClient.WrapDBError("failed to insert event", err)
76+
}
77+
78+
// TODO: readings included in this event will be added to database in the following PRs
79+
80+
return event, nil
81+
}
82+
83+
// EventById gets an event by id
84+
func (c *Client) EventById(id string) (model.Event, errors.EdgeX) {
85+
ctx := context.Background()
86+
var event model.Event
87+
88+
row := c.ConnPool.QueryRow(ctx, sqlQueryContentById(eventTableName), id)
89+
err := row.Scan(&event)
90+
if err != nil {
91+
return event, pgClient.WrapDBError(fmt.Sprintf("failed to query event with id '%s'", id), err)
92+
}
93+
94+
return event, nil
95+
}
96+
97+
// EventTotalCount returns the total count of Event from db
98+
func (c *Client) EventTotalCount() (uint32, errors.EdgeX) {
99+
return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCount(eventTableName))
100+
}
101+
102+
// EventCountByDeviceName returns the count of Event associated a specific Device from db
103+
func (c *Client) EventCountByDeviceName(deviceName string) (uint32, errors.EdgeX) {
104+
fieldMap := map[string]any{deviceNameField: deviceName}
105+
sqlStatement, err := sqlQueryCountByJSONField(eventTableName, fieldMap)
106+
if err != nil {
107+
return 0, errors.NewCommonEdgeXWrapper(err)
108+
}
109+
110+
return getTotalRowsCount(context.Background(), c.ConnPool, sqlStatement)
111+
}
112+
113+
// EventCountByTimeRange returns the count of Event by time range from db
114+
func (c *Client) EventCountByTimeRange(start int, end int) (uint32, errors.EdgeX) {
115+
return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByJSONFieldTimeRange(eventTableName, originField), start, end)
116+
}
117+
118+
// EventsByDeviceName query events by offset, limit and device name
119+
func (c *Client) EventsByDeviceName(offset int, limit int, name string) ([]model.Event, errors.EdgeX) {
120+
fieldMap := map[string]any{deviceNameField: name}
121+
sqlStatement, err := sqlQueryContentByJSONField(eventTableName, fieldMap)
122+
if err != nil {
123+
return nil, errors.NewCommonEdgeXWrapper(err)
124+
}
125+
126+
events, err := queryEvents(context.Background(), c.ConnPool, sqlStatement, offset, limit)
127+
if err != nil {
128+
return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to query events by device '%s'", name), err)
129+
}
130+
return events, nil
131+
}
132+
133+
// EventsByTimeRange query events by time range, offset, and limit
134+
func (c *Client) EventsByTimeRange(start int, end int, offset int, limit int) ([]model.Event, errors.EdgeX) {
135+
ctx := context.Background()
136+
sqlStatement := sqlQueryContentByJSONFieldTimeRange(eventTableName, originField)
137+
138+
events, err := queryEvents(ctx, c.ConnPool, sqlStatement, start, end, offset, limit)
139+
if err != nil {
140+
return nil, errors.NewCommonEdgeXWrapper(err)
141+
}
142+
return events, nil
143+
}
144+
145+
// DeleteEventById removes an event by id
146+
func (c *Client) DeleteEventById(id string) errors.EdgeX {
147+
sqlStatement := sqlDeleteById(eventTableName)
148+
149+
err := deleteEvents(context.Background(), c.ConnPool, sqlStatement, id)
150+
if err != nil {
151+
return errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed delete event with id '%s'", id), err)
152+
}
153+
154+
// TODO: delete related readings associated to the deleted events
155+
156+
return nil
157+
}
158+
159+
// DeleteEventsByDeviceName deletes specific device's events and corresponding readings
160+
// This function is implemented to starts up two goroutines to delete readings and events in the background to achieve better performance
161+
func (c *Client) DeleteEventsByDeviceName(deviceName string) errors.EdgeX {
162+
ctx := context.Background()
163+
fieldMap := map[string]any{deviceNameField: deviceName}
164+
165+
sqlStatement, edgexErr := sqlDeleteByJSONField(eventTableName, fieldMap)
166+
if edgexErr != nil {
167+
return errors.NewCommonEdgeXWrapper(edgexErr)
168+
}
169+
170+
go func() {
171+
err := deleteEvents(ctx, c.ConnPool, sqlStatement)
172+
if err != nil {
173+
c.loggingClient.Errorf("failed delete event with device '%s': %v", deviceName, err)
174+
}
175+
}()
176+
177+
// TODO: delete related readings associated to the deleted events
178+
179+
return nil
180+
}
181+
182+
// DeleteEventsByAge deletes events and their corresponding readings that are older than age
183+
// This function is implemented to starts up two goroutines to delete readings and events in the background to achieve better performance
184+
func (c *Client) DeleteEventsByAge(age int64) errors.EdgeX {
185+
ctx := context.Background()
186+
expireTimestamp := time.Now().UnixNano() - age
187+
sqlStatement := sqlDeleteByAgeInJSONField(eventTableName, originField)
188+
189+
go func() {
190+
err := deleteEvents(ctx, c.ConnPool, sqlStatement, expireTimestamp)
191+
if err != nil {
192+
c.loggingClient.Errorf("failed delete event by age '%d' nanoseconds: %v", age, err)
193+
}
194+
}()
195+
196+
// TODO: delete related readings associated to the deleted events
197+
198+
return nil
199+
}
200+
201+
// queryEvents queries the data rows with given sql statement and passed args, and unmarshal the data rows to the Event model slice
202+
func queryEvents(ctx context.Context, connPool *pgxpool.Pool, sql string, args ...any) ([]model.Event, errors.EdgeX) {
203+
rows, err := connPool.Query(ctx, sql, args...)
204+
if err != nil {
205+
return nil, pgClient.WrapDBError("query failed", err)
206+
}
207+
208+
defer rows.Close()
209+
210+
var events []model.Event
211+
events, err = pgx.CollectRows(rows, func(row pgx.CollectableRow) (model.Event, error) {
212+
var event model.Event
213+
err := rows.Scan(&event)
214+
215+
// TODO: readings data will be added to the event model in the following PRs
216+
217+
return event, err
218+
})
219+
220+
if err != nil {
221+
return nil, pgClient.WrapDBError("failed to scan events", err)
222+
}
223+
224+
return events, nil
225+
}
226+
227+
// deleteEvents delete the data rows with given sql statement and passed args
228+
func deleteEvents(ctx context.Context, connPool *pgxpool.Pool, sqlStatement string, args ...any) errors.EdgeX {
229+
commandTag, err := connPool.Exec(
230+
ctx,
231+
sqlStatement,
232+
args...,
233+
)
234+
if commandTag.RowsAffected() == 0 {
235+
return errors.NewCommonEdgeX(errors.KindContractInvalid, "no event found", nil)
236+
}
237+
if err != nil {
238+
return pgClient.WrapDBError("event(s) delete failed", err)
239+
}
240+
return nil
241+
}

0 commit comments

Comments
 (0)