diff --git a/cmd/core-data/res/db/sql/01-tables.sql b/cmd/core-data/res/db/sql/01-tables.sql index 2e4308cdda..4e58b038d0 100644 --- a/cmd/core-data/res/db/sql/01-tables.sql +++ b/cmd/core-data/res/db/sql/01-tables.sql @@ -12,3 +12,23 @@ CREATE TABLE IF NOT EXISTS core_data.event ( origin BIGINT, tags JSONB ); + +-- core_data.reading is used to store the reading information +CREATE TABLE IF NOT EXISTS core_data.reading ( + id UUID PRIMARY KEY, + event_id UUID, + devicename TEXT, + profilename TEXT, + resourcename TEXT, + origin BIGINT, + valuetype TEXT DEFAULT '', + units TEXT DEFAULT '', + tags JSONB, + value TEXT, + mediatype TEXT, + binaryvalue BYTEA, + objectvalue JSONB, + CONSTRAINT fk_event + FOREIGN KEY(event_id) + REFERENCES core_data.event(id) +); diff --git a/internal/pkg/db/postgres/utils.go b/internal/pkg/db/postgres/utils.go index 6499dfbd09..3a982c04a7 100644 --- a/internal/pkg/db/postgres/utils.go +++ b/internal/pkg/db/postgres/utils.go @@ -130,7 +130,13 @@ func WrapDBError(message string, err error) errors.EdgeX { var pgErr *pgconn.PgError if goErrors.As(err, &pgErr) { if pgerrcode.IsIntegrityConstraintViolation(pgErr.Code) { - return errors.NewCommonEdgeX(errors.KindDuplicateName, pgErr.Detail, nil) + var errMsg string + if message != "" { + errMsg = message + ": " + } + errMsg += pgErr.Detail + + return errors.NewCommonEdgeX(errors.KindDuplicateName, errMsg, nil) } return errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("%s: %s %s", message, pgErr.Error(), pgErr.Detail), nil) } diff --git a/internal/pkg/infrastructure/postgres/consts.go b/internal/pkg/infrastructure/postgres/consts.go new file mode 100644 index 0000000000..86b2f7ae8d --- /dev/null +++ b/internal/pkg/infrastructure/postgres/consts.go @@ -0,0 +1,34 @@ +// +// Copyright (C) 2024 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +// constants relate to the postgres db schema names +const ( + coreDataSchema = "core_data" +) + +// constants relate to the postgres db table names +const ( + eventTableName = coreDataSchema + ".event" + readingTableName = coreDataSchema + ".reading" +) + +// constants relate to the event/reading postgres db table column names +const ( + deviceNameCol = "devicename" + resourceNameCol = "resourcename" + profileNameCol = "profilename" + sourceNameCol = "sourcename" + originCol = "origin" + valueTypeCol = "valuetype" + unitsCol = "units" + tagsCol = "tags" + eventIdFKCol = "event_id" + valueCol = "value" + binaryValueCol = "binaryvalue" + mediaTypeCol = "mediatype" + objectValueCol = "objectvalue" +) diff --git a/internal/pkg/infrastructure/postgres/event.go b/internal/pkg/infrastructure/postgres/event.go index 59a4802e6f..7127d8a1c7 100644 --- a/internal/pkg/infrastructure/postgres/event.go +++ b/internal/pkg/infrastructure/postgres/event.go @@ -22,17 +22,6 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) -const ( - eventTableName = "core_data.event" - - // constants relate to the event struct field names - deviceNameCol = "devicename" - profileNameCol = "profilename" - sourceNameCol = "sourcename" - originCol = "origin" - tagsCol = "tags" -) - // AllEvents queries the events with the given range, offset, and limit func (c *Client) AllEvents(offset, limit int) ([]model.Event, errors.EdgeX) { ctx := context.Background() @@ -65,22 +54,33 @@ func (c *Client) AddEvent(e model.Event) (model.Event, errors.EdgeX) { return model.Event{}, errors.NewCommonEdgeX(errors.KindServerError, "unable to JSON marshal event tags", err) } - _, err = c.ConnPool.Exec( - ctx, - sqlInsert(eventTableName, idCol, deviceNameCol, profileNameCol, sourceNameCol, originCol, tagsCol), - event.Id, - event.DeviceName, - event.ProfileName, - event.SourceName, - event.Origin, - tagsBytes, - ) - if err != nil { - return model.Event{}, pgClient.WrapDBError("failed to insert event", err) - } + err = pgx.BeginFunc(ctx, c.ConnPool, func(tx pgx.Tx) error { + // insert event in a transaction + _, err = tx.Exec( + ctx, + sqlInsert(eventTableName, idCol, deviceNameCol, profileNameCol, sourceNameCol, originCol, tagsCol), + event.Id, + event.DeviceName, + event.ProfileName, + event.SourceName, + event.Origin, + tagsBytes, + ) + if err != nil { + return pgClient.WrapDBError("failed to insert event", err) + } - // TODO: readings included in this event will be added to database in the following PRs + // insert readings in a transaction + err = addReadingsInTx(tx, e.Readings, e.Id) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + return nil + }) + if err != nil { + return model.Event{}, errors.NewCommonEdgeXWrapper(err) + } return event, nil } @@ -96,9 +96,16 @@ func (c *Client) EventById(id string) (model.Event, errors.EdgeX) { event, err = pgx.CollectExactlyOneRow(rows, func(row pgx.CollectableRow) (model.Event, error) { e, err := pgx.RowToStructByNameLax[model.Event](row) + if err != nil { + return model.Event{}, err + } - // TODO: readings data will be added to the event model in the following PRs + readings, err := queryReadings(ctx, c.ConnPool, sqlQueryFieldsByCol(readingTableName, queryReadingCols, eventIdFKCol), e.Id) + if err != nil { + return model.Event{}, err + } + e.Readings = readings return e, err }) if err != nil { @@ -125,7 +132,7 @@ func (c *Client) EventCountByDeviceName(deviceName string) (uint32, errors.EdgeX // EventCountByTimeRange returns the count of Event by time range from db func (c *Client) EventCountByTimeRange(start int, end int) (uint32, errors.EdgeX) { - return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRangeCol(eventTableName, originCol), start, end) + return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRangeCol(eventTableName, originCol, nil), start, end) } // EventsByDeviceName query events by offset, limit and device name @@ -142,7 +149,7 @@ func (c *Client) EventsByDeviceName(offset int, limit int, name string) ([]model // EventsByTimeRange query events by time range, offset, and limit func (c *Client) EventsByTimeRange(start int, end int, offset int, limit int) ([]model.Event, errors.EdgeX) { ctx := context.Background() - sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(eventTableName, originCol, originCol) + sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(eventTableName, originCol, originCol, nil) events, err := queryEvents(ctx, c.ConnPool, sqlStatement, start, end, offset, limit) if err != nil { @@ -153,15 +160,23 @@ func (c *Client) EventsByTimeRange(start int, end int, offset int, limit int) ([ // DeleteEventById removes an event by id func (c *Client) DeleteEventById(id string) errors.EdgeX { + ctx := context.Background() sqlStatement := sqlDeleteById(eventTableName) - err := deleteEvents(context.Background(), c.ConnPool, sqlStatement, id) + // delete event and readings in a transaction + err := pgx.BeginFunc(ctx, c.ConnPool, func(tx pgx.Tx) error { + if err := deleteReadings(ctx, tx, id); err != nil { + return err + } + + if err := deleteEvents(ctx, tx, sqlStatement, id); err != nil { + return err + } + return nil + }) if err != nil { - return errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed delete event with id '%s'", id), err) + return errors.NewCommonEdgeX(errors.Kind(err), "failed delete event", err) } - - // TODO: delete related readings associated to the deleted events - return nil } @@ -173,14 +188,24 @@ func (c *Client) DeleteEventsByDeviceName(deviceName string) errors.EdgeX { sqlStatement := sqlDeleteByColumn(eventTableName, deviceNameCol) go func() { - err := deleteEvents(ctx, c.ConnPool, sqlStatement, deviceName) - if err != nil { - c.loggingClient.Errorf("failed delete event with device '%s': %v", deviceName, err) - } + // delete events and readings in a transaction + _ = pgx.BeginFunc(ctx, c.ConnPool, func(tx pgx.Tx) error { + // select the event-ids of the specified device name from event table as the sub-query of deleting readings + subSqlStatement := sqlQueryFieldsByCol(eventTableName, []string{idCol}, deviceNameCol) + if err := deleteReadingsBySubQuery(ctx, tx, subSqlStatement, deviceName); err != nil { + c.loggingClient.Errorf("failed delete readings with device '%s': %v", deviceName, err) + return err + } + + err := deleteEvents(ctx, tx, sqlStatement, deviceName) + if err != nil { + c.loggingClient.Errorf("failed delete event with device '%s': %v", deviceName, err) + return err + } + return nil + }) }() - // TODO: delete related readings associated to the deleted events - return nil } @@ -192,14 +217,24 @@ func (c *Client) DeleteEventsByAge(age int64) errors.EdgeX { sqlStatement := sqlDeleteTimeRangeByColumn(eventTableName, originCol) go func() { - err := deleteEvents(ctx, c.ConnPool, sqlStatement, expireTimestamp) - if err != nil { - c.loggingClient.Errorf("failed delete event by age '%d' nanoseconds: %v", age, err) - } + // delete events and readings in a transaction + _ = pgx.BeginFunc(ctx, c.ConnPool, func(tx pgx.Tx) error { + // select the event ids within the origin time range from event table as the sub-query of deleting readings + subSqlStatement := sqlQueryFieldsByTimeRange(eventTableName, []string{idCol}, originCol) + if err := deleteReadingsBySubQuery(ctx, tx, subSqlStatement, expireTimestamp); err != nil { + c.loggingClient.Errorf("failed delete readings by age '%d' nanoseconds: %v", age, err) + return err + } + + err := deleteEvents(ctx, tx, sqlStatement, expireTimestamp) + if err != nil { + c.loggingClient.Errorf("failed delete event by age '%d' nanoseconds: %v", age, err) + return err + } + return nil + }) }() - // TODO: delete related readings associated to the deleted events - return nil } @@ -213,10 +248,17 @@ func queryEvents(ctx context.Context, connPool *pgxpool.Pool, sql string, args . var events []model.Event events, err = pgx.CollectRows(rows, func(row pgx.CollectableRow) (model.Event, error) { event, err := pgx.RowToStructByNameLax[model.Event](row) + if err != nil { + return model.Event{}, err + } - // TODO: readings data will be added to the event model in the following PRs + readings, err := queryReadings(ctx, connPool, sqlQueryFieldsByCol(readingTableName, queryReadingCols, eventIdFKCol), event.Id) + if err != nil { + return model.Event{}, err + } - return event, err + event.Readings = readings + return event, nil }) if err != nil { @@ -226,18 +268,18 @@ func queryEvents(ctx context.Context, connPool *pgxpool.Pool, sql string, args . return events, nil } -// deleteEvents delete the data rows with given sql statement and passed args -func deleteEvents(ctx context.Context, connPool *pgxpool.Pool, sqlStatement string, args ...any) errors.EdgeX { - commandTag, err := connPool.Exec( +// deleteEvents delete the data rows with given sql statement and passed args in a db transaction +func deleteEvents(ctx context.Context, tx pgx.Tx, sqlStatement string, args ...any) errors.EdgeX { + commandTag, err := tx.Exec( ctx, sqlStatement, args..., ) - if commandTag.RowsAffected() == 0 { - return errors.NewCommonEdgeX(errors.KindContractInvalid, "no event found", nil) - } if err != nil { return pgClient.WrapDBError("event(s) delete failed", err) } + if commandTag.RowsAffected() == 0 { + return errors.NewCommonEdgeX(errors.KindContractInvalid, "no event found", nil) + } return nil } diff --git a/internal/pkg/infrastructure/postgres/models/reading.go b/internal/pkg/infrastructure/postgres/models/reading.go new file mode 100644 index 0000000000..7d5de1fabf --- /dev/null +++ b/internal/pkg/infrastructure/postgres/models/reading.go @@ -0,0 +1,44 @@ +// +// Copyright (C) 2024 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package models + +import "github.com/edgexfoundry/go-mod-core-contracts/v3/models" + +// Reading struct contains the columns of the core_data.reading table in Postgres db relates to a reading +// which includes all the fields in BaseReading, BinaryReading, SimpleReading and ObjectReading +type Reading struct { + models.BaseReading + BinaryReading + SimpleReading + ObjectReading +} + +type SimpleReading struct { + Value *string +} + +type BinaryReading struct { + BinaryValue []byte + MediaType *string +} + +type ObjectReading struct { + ObjectValue any +} + +// GetBaseReading makes the Reading struct to implement the go-mod-core-contract Reading interface in models +func (r Reading) GetBaseReading() models.BaseReading { + return models.BaseReading{ + Id: r.Id, + Origin: r.Origin, + DeviceName: r.DeviceName, + ResourceName: r.ResourceName, + ProfileName: r.ProfileName, + ValueType: r.ValueType, + Units: r.Units, + Tags: r.Tags, + } +} diff --git a/internal/pkg/infrastructure/postgres/reading.go b/internal/pkg/infrastructure/postgres/reading.go index 6969f51211..138efac5b2 100644 --- a/internal/pkg/infrastructure/postgres/reading.go +++ b/internal/pkg/infrastructure/postgres/reading.go @@ -6,78 +6,391 @@ package postgres import ( + "context" + "encoding/json" + "fmt" + "strings" + + pgClient "github.com/edgexfoundry/edgex-go/internal/pkg/db/postgres" + dbModels "github.com/edgexfoundry/edgex-go/internal/pkg/infrastructure/postgres/models" + "github.com/edgexfoundry/go-mod-core-contracts/v3/errors" model "github.com/edgexfoundry/go-mod-core-contracts/v3/models" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +var ( + // insertReadingCols defines the reading table columns in slice used in inserting readings + insertReadingCols = []string{idCol, eventIdFKCol, deviceNameCol, profileNameCol, resourceNameCol, originCol, valueTypeCol, unitsCol, tagsCol, valueCol, mediaTypeCol, binaryValueCol, objectValueCol} + // queryReadingCols defines the reading table columns in slice used in querying reading + queryReadingCols = []string{idCol, deviceNameCol, profileNameCol, resourceNameCol, originCol, valueTypeCol, unitsCol, tagsCol, valueCol, objectValueCol, mediaTypeCol, binaryValueCol} ) func (c *Client) ReadingTotalCount() (uint32, errors.EdgeX) { - return 0, nil + return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCount(readingTableName)) } func (c *Client) AllReadings(offset int, limit int) ([]model.Reading, errors.EdgeX) { - return nil, nil -} + ctx := context.Background() -func (c *Client) ReadingsByTimeRange(start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { - return nil, nil + readings, err := queryReadings(ctx, c.ConnPool, sqlQueryAllWithPagination(readingTableName), offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, "failed to query all readings", err) + } + + return readings, nil } +// ReadingsByResourceName query readings by offset, limit and resource name func (c *Client) ReadingsByResourceName(offset int, limit int, resourceName string) ([]model.Reading, errors.EdgeX) { - return nil, nil + sqlStatement := sqlQueryAllByColWithPagination(readingTableName, resourceNameCol) + + readings, err := queryReadings(context.Background(), c.ConnPool, sqlStatement, resourceName, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to query readings by resource '%s'", resourceName), err) + } + return readings, nil } +// ReadingsByDeviceName query readings by offset, limit and device name func (c *Client) ReadingsByDeviceName(offset int, limit int, name string) ([]model.Reading, errors.EdgeX) { - return nil, nil + sqlStatement := sqlQueryAllByColWithPagination(readingTableName, deviceNameCol) + + readings, err := queryReadings(context.Background(), c.ConnPool, sqlStatement, name, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to query readings by device '%s'", name), err) + } + return readings, nil } +// ReadingsByDeviceNameAndResourceName query readings by offset, limit, device name and resource name func (c *Client) ReadingsByDeviceNameAndResourceName(deviceName string, resourceName string, offset int, limit int) ([]model.Reading, errors.EdgeX) { - return nil, nil + sqlStatement := sqlQueryAllByColWithPagination(readingTableName, deviceNameCol, resourceNameCol) + + readings, err := queryReadings(context.Background(), c.ConnPool, sqlStatement, deviceName, resourceName, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, + fmt.Sprintf("failed to query readings by device '%s' and resource '%s'", deviceName, resourceName), err) + } + return readings, nil +} + +// ReadingsByTimeRange query readings by origin within the time range with offset and limit +func (c *Client) ReadingsByTimeRange(start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { + ctx := context.Background() + sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(readingTableName, originCol, originCol, nil) + + readings, err := queryReadings(ctx, c.ConnPool, sqlStatement, start, end, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + return readings, nil } +// ReadingsByDeviceNameAndTimeRange query readings by the specified device, origin within the time range, offset, and limit +func (c *Client) ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { + ctx := context.Background() + sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(readingTableName, originCol, originCol, nil, deviceNameCol) + + readings, err := queryReadings(ctx, c.ConnPool, sqlStatement, start, end, deviceName, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + return readings, nil +} + +// ReadingsByResourceNameAndTimeRange query readings by the specified resource, origin within the time range, offset, and limit +func (c *Client) ReadingsByResourceNameAndTimeRange(resourceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { + ctx := context.Background() + sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(readingTableName, originCol, originCol, nil, resourceNameCol) + + readings, err := queryReadings(ctx, c.ConnPool, sqlStatement, start, end, resourceName, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + return readings, nil +} + +// ReadingsByDeviceNameAndResourceNameAndTimeRange query readings by the specified device and resource, origin within the time range, offset, and limit func (c *Client) ReadingsByDeviceNameAndResourceNameAndTimeRange(deviceName string, resourceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { - return nil, nil + ctx := context.Background() + sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(readingTableName, originCol, originCol, nil, deviceNameCol, resourceNameCol) + + readings, err := queryReadings(ctx, c.ConnPool, sqlStatement, start, end, deviceName, resourceName, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + return readings, nil +} + +// ReadingsByDeviceNameAndResourceNamesAndTimeRange query readings by the specified device and resourceName slice, origin within the time range, offset and limit +func (c *Client) ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int) ([]model.Reading, uint32, errors.EdgeX) { + ctx := context.Background() + + sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(readingTableName, originCol, originCol, + []string{resourceNameCol}, deviceNameCol, resourceNameCol) + + // build the query args for the where condition using in querying readings and reading count + queryArgs := []any{start, end, deviceName, resourceNames} + // make a copy for query count args as we don't need offset and limit while querying total count + queryCountArgs := append([]any{}, queryArgs...) + // add offset and limit for query args + queryArgs = append(queryArgs, offset, limit) + + // query readings + readings, err := queryReadings(ctx, c.ConnPool, sqlStatement, queryArgs...) + if err != nil { + return nil, 0, errors.NewCommonEdgeXWrapper(err) + } + + // get the total count of readings based on the condition column names and query count args + totalCount, err := getTotalRowsCount(context.Background(), + c.ConnPool, + sqlQueryCountByTimeRangeCol(readingTableName, originCol, []string{resourceNameCol}, deviceNameCol, resourceNameCol), + queryCountArgs...) + if err != nil { + return nil, 0, errors.NewCommonEdgeXWrapper(err) + } + return readings, totalCount, nil } +// ReadingCountByDeviceName returns the count of Readings associated a specific Device from db func (c *Client) ReadingCountByDeviceName(deviceName string) (uint32, errors.EdgeX) { - return 0, nil + sqlStatement := sqlQueryCountByCol(readingTableName, deviceNameCol) + + return getTotalRowsCount(context.Background(), c.ConnPool, sqlStatement, deviceName) } +// ReadingCountByResourceName returns the count of Readings associated a specific resource from db func (c *Client) ReadingCountByResourceName(resourceName string) (uint32, errors.EdgeX) { - return 0, nil -} + sqlStatement := sqlQueryCountByCol(readingTableName, resourceNameCol) -func (c *Client) ReadingCountByResourceNameAndTimeRange(resourceName string, start int, end int) (uint32, errors.EdgeX) { - return 0, nil + return getTotalRowsCount(context.Background(), c.ConnPool, sqlStatement, resourceName) } +// ReadingCountByDeviceNameAndResourceName returns the count of readings associated a specific device and resource values from db func (c *Client) ReadingCountByDeviceNameAndResourceName(deviceName string, resourceName string) (uint32, errors.EdgeX) { - return 0, nil -} + sqlStatement := sqlQueryCountByCol(readingTableName, deviceNameCol, resourceNameCol) -func (c *Client) ReadingCountByDeviceNameAndResourceNameAndTimeRange(deviceName string, resourceName string, start int, end int) (uint32, errors.EdgeX) { - return 0, nil + return getTotalRowsCount(context.Background(), c.ConnPool, sqlStatement, deviceName, resourceName) } +// ReadingCountByTimeRange returns the count of reading by origin within the time range from db func (c *Client) ReadingCountByTimeRange(start int, end int) (uint32, errors.EdgeX) { - return 0, nil + return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRangeCol(readingTableName, originCol, nil), start, end) } -func (c *Client) ReadingsByResourceNameAndTimeRange(resourceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { - return nil, nil +// ReadingCountByDeviceNameAndTimeRange returns the count of readings by origin within the time range and the specified device from db +func (c *Client) ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX) { + return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRangeCol(readingTableName, originCol, nil, deviceNameCol), start, end, deviceName) } -func (c *Client) ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int) ([]model.Reading, uint32, errors.EdgeX) { - return nil, 0, nil +// ReadingCountByResourceNameAndTimeRange returns the count of readings by origin within the time range and the specified resource from db +func (c *Client) ReadingCountByResourceNameAndTimeRange(resourceName string, start int, end int) (uint32, errors.EdgeX) { + return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRangeCol(readingTableName, originCol, nil, resourceNameCol), start, end, resourceName) } -func (c *Client) ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { - return nil, nil +// ReadingCountByDeviceNameAndResourceNameAndTimeRange returns the count of readings by origin within the time range +// associated with the specified device and resource from db +func (c *Client) ReadingCountByDeviceNameAndResourceNameAndTimeRange(deviceName string, resourceName string, start int, end int) (uint32, errors.EdgeX) { + return getTotalRowsCount(context.Background(), + c.ConnPool, + sqlQueryCountByTimeRangeCol(readingTableName, originCol, nil, deviceNameCol, resourceNameCol), + start, + end, + deviceName, + resourceName) } -func (c *Client) ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX) { - return 0, nil +func (c *Client) LatestReadingByOffset(offset uint32) (model.Reading, errors.EdgeX) { + ctx := context.Background() + + readings, err := queryReadings(ctx, c.ConnPool, sqlQueryAllWithPaginationDescByCol(readingTableName, originCol), offset, 1) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, "failed to query all readings", err) + } + + if len(readings) == 0 { + return nil, errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("no reading found with offset '%d'", offset), err) + } + return readings[0], nil } -func (c *Client) LatestReadingByOffset(offset uint32) (model.Reading, errors.EdgeX) { - return nil, nil +// queryReadings queries the data rows with given sql statement and passed args, converts the rows to map and unmarshal the data rows to the Reading model slice +func queryReadings(ctx context.Context, connPool *pgxpool.Pool, sql string, args ...any) ([]model.Reading, errors.EdgeX) { + rows, err := connPool.Query(ctx, sql, args...) + if err != nil { + return nil, pgClient.WrapDBError("query failed", err) + } + + readings, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (model.Reading, error) { + var reading model.Reading + + readingDBModel, err := pgx.RowToStructByNameLax[dbModels.Reading](row) + if err != nil { + return nil, pgClient.WrapDBError("failed to convert row to map", err) + } + + // convert the BaseReading fields to BaseReading struct defined in contract + baseReading := readingDBModel.GetBaseReading() + + if readingDBModel.BinaryValue != nil { + // reading type is BinaryReading + binaryReading := model.BinaryReading{ + BaseReading: baseReading, + MediaType: *readingDBModel.MediaType, + BinaryValue: readingDBModel.BinaryValue, + } + reading = binaryReading + } else if readingDBModel.ObjectValue != nil { + // reading type is ObjectReading + objReading := model.ObjectReading{ + BaseReading: baseReading, + ObjectValue: readingDBModel.ObjectValue, + } + reading = objReading + } else if readingDBModel.Value != nil { + // reading type is SimpleReading + simpleReading := model.SimpleReading{ + BaseReading: baseReading, + Value: *readingDBModel.Value, + } + reading = simpleReading + } else { + return reading, errors.NewCommonEdgeX(errors.KindServerError, "failed to convert reading to none of BinaryReading/ObjectReading/SimpleReading structs", nil) + } + + return reading, nil + }) + + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + + return readings, nil +} + +// deleteReadings delete the data rows with given sql statement and passed args +func deleteReadings(ctx context.Context, tx pgx.Tx, args ...any) errors.EdgeX { + sqlStatement := sqlDeleteByColumn(readingTableName, eventIdFKCol) + commandTag, err := tx.Exec( + ctx, + sqlStatement, + args..., + ) + if commandTag.RowsAffected() == 0 { + return errors.NewCommonEdgeX(errors.KindContractInvalid, "no reading found", nil) + } + if err != nil { + return pgClient.WrapDBError("reading(s) delete failed", err) + } + return nil +} + +// deleteReadings delete the readings with event_id in the range of the sub query +func deleteReadingsBySubQuery(ctx context.Context, tx pgx.Tx, subQuerySql string, args ...any) errors.EdgeX { + sqlStatement := sqlDeleteByColumn(readingTableName, eventIdFKCol) + subQueryCond := "ANY ( " + subQuerySql + " )" + sqlStatement = strings.Replace(sqlStatement, "$1", subQueryCond, 1) + + commandTag, err := tx.Exec( + ctx, + sqlStatement, + args..., + ) + if commandTag.RowsAffected() == 0 { + return errors.NewCommonEdgeX(errors.KindContractInvalid, "no reading found", nil) + } + if err != nil { + return pgClient.WrapDBError("reading(s) delete failed", err) + } + return nil +} + +// addReadingsInTx converts reading interface to BinaryReading/ObjectReading/SimpleReading structs first based on the reading value type +// and then perform the CopyFromSlice transaction to insert readings in batch +func addReadingsInTx(tx pgx.Tx, readings []model.Reading, eventId string) error { + var readingDBModels []dbModels.Reading + + for _, r := range readings { + baseReading := r.GetBaseReading() + var readingDBModel dbModels.Reading + + switch contractReadingModel := r.(type) { + case model.BinaryReading: + // convert BinaryReading struct to Reading DB model + readingDBModel = dbModels.Reading{ + BaseReading: baseReading, + BinaryReading: dbModels.BinaryReading{ + BinaryValue: contractReadingModel.BinaryValue, + MediaType: &contractReadingModel.MediaType, + }, + } + case model.ObjectReading: + // convert ObjectReading struct to Reading DB model + readingDBModel = dbModels.Reading{ + BaseReading: baseReading, + ObjectReading: dbModels.ObjectReading{ + ObjectValue: contractReadingModel.ObjectValue, + }, + } + case model.SimpleReading: + // convert SimpleReading struct to Reading DB model + readingDBModel = dbModels.Reading{ + BaseReading: baseReading, + SimpleReading: dbModels.SimpleReading{Value: &contractReadingModel.Value}, + } + default: + return errors.NewCommonEdgeX(errors.KindContractInvalid, "failed to convert reading to none of BinaryReading/ObjectReading/SimpleReading structs", nil) + } + readingDBModels = append(readingDBModels, readingDBModel) + } + + // insert readingDBModels slice in batch + _, err := tx.CopyFrom( + context.Background(), + strings.Split(readingTableName, "."), + insertReadingCols, + pgx.CopyFromSlice(len(readingDBModels), func(i int) ([]any, error) { + var tagsBytes []byte + var objectValueBytes []byte + var err error + + r := readingDBModels[i] + if r.Tags != nil { + tagsBytes, err = json.Marshal(r.Tags) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindServerError, "unable to JSON marshal reading tags", err) + } + } + if r.ObjectValue != nil { + objectValueBytes, err = json.Marshal(r.ObjectValue) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindServerError, "unable to JSON marshal reading ObjectValue", err) + } + } + return []any{ + r.Id, + eventId, + r.DeviceName, + r.ProfileName, + r.ResourceName, + r.Origin, + r.ValueType, + r.Units, + tagsBytes, + r.Value, + r.MediaType, + r.BinaryValue, + objectValueBytes, + }, nil + }), + ) + if err != nil { + return pgClient.WrapDBError("failed to insert readings in batch", err) + } + + return nil } diff --git a/internal/pkg/infrastructure/postgres/sql.go b/internal/pkg/infrastructure/postgres/sql.go index c790455281..825b399609 100644 --- a/internal/pkg/infrastructure/postgres/sql.go +++ b/internal/pkg/infrastructure/postgres/sql.go @@ -10,6 +10,7 @@ package postgres import ( "fmt" + "slices" "strings" ) @@ -55,6 +56,14 @@ func sqlInsertContent(table string) string { // return fmt.Sprintf("SELECT * FROM %s", table) //} +// sqlQueryFieldsByCol returns the SQL statement for selecting the given fields of rows from the table by the conditions composed of given columns +func sqlQueryFieldsByCol(table string, fields []string, columns ...string) string { + whereCondition := constructWhereCondition(columns...) + queryFieldStr := strings.Join(fields, ", ") + + return fmt.Sprintf("SELECT %s FROM %s WHERE %s", queryFieldStr, table, whereCondition) +} + // sqlQueryAllWithTimeRange returns the SQL statement for selecting all rows from the table with a time range. //func sqlQueryAllWithTimeRange(table string) string { // return fmt.Sprintf("SELECT * FROM %s WHERE %s >= $1 AND %s <= $2", table, createdCol, createdCol) @@ -70,6 +79,11 @@ func sqlQueryAllWithPagination(table string) string { // return fmt.Sprintf("SELECT * FROM %s ORDER BY %s DESC OFFSET $1 LIMIT $2", table, createdCol) //} +// sqlQueryAllWithPaginationDescByCol returns the SQL statement for selecting all rows from the table with the pagination and desc by descCol +func sqlQueryAllWithPaginationDescByCol(table string, descCol string) string { + return fmt.Sprintf("SELECT * FROM %s ORDER BY %s DESC OFFSET $1 LIMIT $2", table, descCol) +} + // sqlQueryAllByColWithPagination returns the SQL statement for selecting all rows from the table by the given columns with pagination func sqlQueryAllByColWithPagination(table string, columns ...string) string { columnCount := len(columns) @@ -83,10 +97,14 @@ func sqlQueryAllWithPaginationAndTimeRange(table string) string { return fmt.Sprintf("SELECT * FROM %s WHERE %s >= $1 AND %s <= $2 ORDER BY %s OFFSET $3 LIMIT $4", table, createdCol, createdCol, createdCol) } -// sqlQueryAllWithPaginationAndTimeRangeDescByCol returns the SQL statement for selecting all rows from the table -// with pagination and a time range by column1, desc by column2 -func sqlQueryAllWithPaginationAndTimeRangeDescByCol(table string, timeRangeCol string, descCol string) string { - return fmt.Sprintf("SELECT * FROM %s WHERE %s >= $1 AND %s <= $2 ORDER BY %s DESC OFFSET $3 LIMIT $4", table, timeRangeCol, timeRangeCol, descCol) +// sqlQueryAllWithPaginationAndTimeRangeDescByCol returns the SQL statement for selecting all rows from the table with the arrayColNames slice, +// provided columns with pagination and a time range by timeRangeCol, desc by descCol +func sqlQueryAllWithPaginationAndTimeRangeDescByCol(table string, timeRangeCol string, descCol string, arrayColNames []string, columns ...string) string { + whereCondition := constructWhereCondWithTimeRange(timeRangeCol, arrayColNames, columns...) + columnCount := len(columns) + + return fmt.Sprintf("SELECT * FROM %s WHERE %s ORDER BY %s DESC OFFSET $%d LIMIT $%d", + table, whereCondition, descCol, columnCount+3, columnCount+4) } // sqlQueryAllByStatusWithPaginationAndTimeRange returns the SQL statement for selecting all rows from the table by status with pagination and a time range. @@ -161,8 +179,9 @@ func sqlQueryCountByCol(table string, columns ...string) string { // sqlQueryCountByTimeRangeCol returns the SQL statement for counting the number of rows in the table // by the given time range of the specified column -func sqlQueryCountByTimeRangeCol(table string, column string) string { - return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE %s >= $1 AND %s <= $2", table, column, column) +func sqlQueryCountByTimeRangeCol(table string, timeRangeCol string, arrayColNames []string, columns ...string) string { + whereCondition := constructWhereCondWithTimeRange(timeRangeCol, arrayColNames, columns...) + return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE %s", table, whereCondition) } // sqlQueryCountByJSONField returns the SQL statement for counting the number of rows in the table by the given JSON query string @@ -176,6 +195,13 @@ func sqlQueryCountByTimeRangeCol(table string, column string) string { // return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE (content->'%s')::bigint >= $1 AND (content->'%s')::bigint <= $2", table, field, field) //} +// sqlQueryFieldsByTimeRange returns the SQL statement for selecting fields from the table within the time range +func sqlQueryFieldsByTimeRange(table string, fields []string, timeRangeCol string) string { + queryFieldStr := strings.Join(fields, ", ") + + return fmt.Sprintf("SELECT %s FROM %s WHERE %s <= $1", queryFieldStr, table, timeRangeCol) +} + // ---------------------------------------------------------------------------------- // SQL statements for UPDATE operations // ---------------------------------------------------------------------------------- @@ -240,3 +266,23 @@ func constructWhereCondition(columns ...string) string { return strings.Join(conditions, " AND ") } + +// constructWhereCondition constructs the WHERE condition for the given columns with time range +// if arrayColNames is not empty, ANY operator will be added to accept the array argument for the specified array col names +func constructWhereCondWithTimeRange(timeRangeCol string, arrayColNames []string, columns ...string) string { + var hasArrayColumn bool + conditions := []string{timeRangeCol + " >= $1", timeRangeCol + " <= $2"} + + if len(arrayColNames) > 0 { + hasArrayColumn = true + } + for i, column := range columns { + equalCondition := "%s = $%d" + if hasArrayColumn && slices.Contains(arrayColNames, column) { + equalCondition = "%s = ANY ($%d)" + } + conditions = append(conditions, fmt.Sprintf(equalCondition, column, i+3)) + } + + return strings.Join(conditions, " AND ") +}