Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add core-data postgres reading db client methods #4862

Merged
merged 4 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions cmd/core-data/res/db/sql/01-tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,23 @@ CREATE TABLE IF NOT EXISTS core_data.event (
origin BIGINT,
tags JSONB
);

-- core_data.reading is used to store the reading information
CREATE TABLE IF NOT EXISTS core_data.reading (
id UUID PRIMARY KEY,
event_id UUID,
devicename TEXT,
profilename TEXT,
resourcename TEXT,
origin BIGINT,
valuetype TEXT DEFAULT '',
units TEXT DEFAULT '',
tags JSONB,
value TEXT,
mediatype TEXT,
binaryvalue BYTEA,
objectvalue JSONB,
CONSTRAINT fk_event
FOREIGN KEY(event_id)
REFERENCES core_data.event(id)
);
8 changes: 7 additions & 1 deletion internal/pkg/db/postgres/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,13 @@ func WrapDBError(message string, err error) errors.EdgeX {
var pgErr *pgconn.PgError
if goErrors.As(err, &pgErr) {
if pgerrcode.IsIntegrityConstraintViolation(pgErr.Code) {
return errors.NewCommonEdgeX(errors.KindDuplicateName, pgErr.Detail, nil)
var errMsg string
if message != "" {
errMsg = message + ": "
}
errMsg += pgErr.Detail

return errors.NewCommonEdgeX(errors.KindDuplicateName, errMsg, nil)
}
return errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("%s: %s %s", message, pgErr.Error(), pgErr.Detail), nil)
}
Expand Down
34 changes: 34 additions & 0 deletions internal/pkg/infrastructure/postgres/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//
// Copyright (C) 2024 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package postgres

// constants relate to the postgres db schema names
const (
coreDataSchema = "core_data"
)

// constants relate to the postgres db table names
const (
eventTableName = coreDataSchema + ".event"
readingTableName = coreDataSchema + ".reading"
)

// constants relate to the event/reading postgres db table column names
const (
deviceNameCol = "devicename"
resourceNameCol = "resourcename"
profileNameCol = "profilename"
sourceNameCol = "sourcename"
originCol = "origin"
valueTypeCol = "valuetype"
unitsCol = "units"
tagsCol = "tags"
eventIdFKCol = "event_id"
valueCol = "value"
binaryValueCol = "binaryvalue"
mediaTypeCol = "mediatype"
objectValueCol = "objectvalue"
)
148 changes: 95 additions & 53 deletions internal/pkg/infrastructure/postgres/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,6 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
)

const (
eventTableName = "core_data.event"

// constants relate to the event struct field names
deviceNameCol = "devicename"
profileNameCol = "profilename"
sourceNameCol = "sourcename"
originCol = "origin"
tagsCol = "tags"
)

// AllEvents queries the events with the given range, offset, and limit
func (c *Client) AllEvents(offset, limit int) ([]model.Event, errors.EdgeX) {
ctx := context.Background()
Expand Down Expand Up @@ -65,22 +54,33 @@ func (c *Client) AddEvent(e model.Event) (model.Event, errors.EdgeX) {
return model.Event{}, errors.NewCommonEdgeX(errors.KindServerError, "unable to JSON marshal event tags", err)
}

_, err = c.ConnPool.Exec(
ctx,
sqlInsert(eventTableName, idCol, deviceNameCol, profileNameCol, sourceNameCol, originCol, tagsCol),
event.Id,
event.DeviceName,
event.ProfileName,
event.SourceName,
event.Origin,
tagsBytes,
)
if err != nil {
return model.Event{}, pgClient.WrapDBError("failed to insert event", err)
}
err = pgx.BeginFunc(ctx, c.ConnPool, func(tx pgx.Tx) error {
// insert event in a transaction
_, err = tx.Exec(
ctx,
sqlInsert(eventTableName, idCol, deviceNameCol, profileNameCol, sourceNameCol, originCol, tagsCol),
event.Id,
event.DeviceName,
event.ProfileName,
event.SourceName,
event.Origin,
tagsBytes,
)
if err != nil {
return pgClient.WrapDBError("failed to insert event", err)
}

// TODO: readings included in this event will be added to database in the following PRs
// insert readings in a transaction
err = addReadingsInTx(tx, e.Readings, e.Id)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
return nil
})

if err != nil {
return model.Event{}, errors.NewCommonEdgeXWrapper(err)
}
return event, nil
}

Expand All @@ -96,9 +96,16 @@ func (c *Client) EventById(id string) (model.Event, errors.EdgeX) {

event, err = pgx.CollectExactlyOneRow(rows, func(row pgx.CollectableRow) (model.Event, error) {
e, err := pgx.RowToStructByNameLax[model.Event](row)
if err != nil {
return model.Event{}, err
}

// TODO: readings data will be added to the event model in the following PRs
readings, err := queryReadings(ctx, c.ConnPool, sqlQueryFieldsByCol(readingTableName, queryReadingCols, eventIdFKCol), e.Id)
if err != nil {
return model.Event{}, err
}

e.Readings = readings
return e, err
})
if err != nil {
Expand All @@ -125,7 +132,7 @@ func (c *Client) EventCountByDeviceName(deviceName string) (uint32, errors.EdgeX

// EventCountByTimeRange returns the count of Event by time range from db
func (c *Client) EventCountByTimeRange(start int, end int) (uint32, errors.EdgeX) {
return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRangeCol(eventTableName, originCol), start, end)
return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRangeCol(eventTableName, originCol, nil), start, end)
}

// EventsByDeviceName query events by offset, limit and device name
Expand All @@ -142,7 +149,7 @@ func (c *Client) EventsByDeviceName(offset int, limit int, name string) ([]model
// EventsByTimeRange query events by time range, offset, and limit
func (c *Client) EventsByTimeRange(start int, end int, offset int, limit int) ([]model.Event, errors.EdgeX) {
ctx := context.Background()
sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(eventTableName, originCol, originCol)
sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(eventTableName, originCol, originCol, nil)

events, err := queryEvents(ctx, c.ConnPool, sqlStatement, start, end, offset, limit)
if err != nil {
Expand All @@ -153,15 +160,23 @@ func (c *Client) EventsByTimeRange(start int, end int, offset int, limit int) ([

// DeleteEventById removes an event by id
func (c *Client) DeleteEventById(id string) errors.EdgeX {
ctx := context.Background()
sqlStatement := sqlDeleteById(eventTableName)

err := deleteEvents(context.Background(), c.ConnPool, sqlStatement, id)
// delete event and readings in a transaction
err := pgx.BeginFunc(ctx, c.ConnPool, func(tx pgx.Tx) error {
if err := deleteReadings(ctx, tx, id); err != nil {
return err
}

if err := deleteEvents(ctx, tx, sqlStatement, id); err != nil {
return err
}
return nil
})
if err != nil {
return errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed delete event with id '%s'", id), err)
return errors.NewCommonEdgeX(errors.Kind(err), "failed delete event", err)
}

// TODO: delete related readings associated to the deleted events

return nil
}

Expand All @@ -173,14 +188,24 @@ func (c *Client) DeleteEventsByDeviceName(deviceName string) errors.EdgeX {
sqlStatement := sqlDeleteByColumn(eventTableName, deviceNameCol)

go func() {
err := deleteEvents(ctx, c.ConnPool, sqlStatement, deviceName)
if err != nil {
c.loggingClient.Errorf("failed delete event with device '%s': %v", deviceName, err)
}
// delete events and readings in a transaction
_ = pgx.BeginFunc(ctx, c.ConnPool, func(tx pgx.Tx) error {
// select the event-ids of the specified device name from event table as the sub-query of deleting readings
subSqlStatement := sqlQueryFieldsByCol(eventTableName, []string{idCol}, deviceNameCol)
if err := deleteReadingsBySubQuery(ctx, tx, subSqlStatement, deviceName); err != nil {
c.loggingClient.Errorf("failed delete readings with device '%s': %v", deviceName, err)
return err
}

err := deleteEvents(ctx, tx, sqlStatement, deviceName)
if err != nil {
c.loggingClient.Errorf("failed delete event with device '%s': %v", deviceName, err)
return err
}
return nil
})
}()

// TODO: delete related readings associated to the deleted events

return nil
}

Expand All @@ -192,14 +217,24 @@ func (c *Client) DeleteEventsByAge(age int64) errors.EdgeX {
sqlStatement := sqlDeleteTimeRangeByColumn(eventTableName, originCol)

go func() {
err := deleteEvents(ctx, c.ConnPool, sqlStatement, expireTimestamp)
if err != nil {
c.loggingClient.Errorf("failed delete event by age '%d' nanoseconds: %v", age, err)
}
// delete events and readings in a transaction
_ = pgx.BeginFunc(ctx, c.ConnPool, func(tx pgx.Tx) error {
// select the event ids within the origin time range from event table as the sub-query of deleting readings
subSqlStatement := sqlQueryFieldsByTimeRange(eventTableName, []string{idCol}, originCol)
if err := deleteReadingsBySubQuery(ctx, tx, subSqlStatement, expireTimestamp); err != nil {
c.loggingClient.Errorf("failed delete readings by age '%d' nanoseconds: %v", age, err)
return err
}

err := deleteEvents(ctx, tx, sqlStatement, expireTimestamp)
if err != nil {
c.loggingClient.Errorf("failed delete event by age '%d' nanoseconds: %v", age, err)
return err
}
return nil
})
}()

// TODO: delete related readings associated to the deleted events

return nil
}

Expand All @@ -213,10 +248,17 @@ func queryEvents(ctx context.Context, connPool *pgxpool.Pool, sql string, args .
var events []model.Event
events, err = pgx.CollectRows(rows, func(row pgx.CollectableRow) (model.Event, error) {
event, err := pgx.RowToStructByNameLax[model.Event](row)
if err != nil {
return model.Event{}, err
}

// TODO: readings data will be added to the event model in the following PRs
readings, err := queryReadings(ctx, connPool, sqlQueryFieldsByCol(readingTableName, queryReadingCols, eventIdFKCol), event.Id)
if err != nil {
return model.Event{}, err
}

return event, err
event.Readings = readings
return event, nil
})

if err != nil {
Expand All @@ -226,18 +268,18 @@ func queryEvents(ctx context.Context, connPool *pgxpool.Pool, sql string, args .
return events, nil
}

// deleteEvents delete the data rows with given sql statement and passed args
func deleteEvents(ctx context.Context, connPool *pgxpool.Pool, sqlStatement string, args ...any) errors.EdgeX {
commandTag, err := connPool.Exec(
// deleteEvents delete the data rows with given sql statement and passed args in a db transaction
func deleteEvents(ctx context.Context, tx pgx.Tx, sqlStatement string, args ...any) errors.EdgeX {
commandTag, err := tx.Exec(
ctx,
sqlStatement,
args...,
)
if commandTag.RowsAffected() == 0 {
return errors.NewCommonEdgeX(errors.KindContractInvalid, "no event found", nil)
}
if err != nil {
return pgClient.WrapDBError("event(s) delete failed", err)
}
if commandTag.RowsAffected() == 0 {
return errors.NewCommonEdgeX(errors.KindContractInvalid, "no event found", nil)
}
return nil
}
44 changes: 44 additions & 0 deletions internal/pkg/infrastructure/postgres/models/reading.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//
// Copyright (C) 2024 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package models

import "github.com/edgexfoundry/go-mod-core-contracts/v3/models"

// Reading struct contains the columns of the core_data.reading table in Postgres db relates to a reading
// which includes all the fields in BaseReading, BinaryReading, SimpleReading and ObjectReading
type Reading struct {
models.BaseReading
BinaryReading
SimpleReading
ObjectReading
}

type SimpleReading struct {
Value *string
}

type BinaryReading struct {
BinaryValue []byte
MediaType *string
}

type ObjectReading struct {
ObjectValue any
}

// GetBaseReading makes the Reading struct to implement the go-mod-core-contract Reading interface in models
func (r Reading) GetBaseReading() models.BaseReading {
return models.BaseReading{
Id: r.Id,
Origin: r.Origin,
DeviceName: r.DeviceName,
ResourceName: r.ResourceName,
ProfileName: r.ProfileName,
ValueType: r.ValueType,
Units: r.Units,
Tags: r.Tags,
}
}
Loading
Loading