From 90ff5652c8dd6660c489f9c53674f245c29ab6dd Mon Sep 17 00:00:00 2001 From: Lindsey Cheng Date: Wed, 14 Aug 2024 14:41:51 +0800 Subject: [PATCH 1/4] feat: Add core-data postgres reading db client methods Relates to #4847. Add core-data reading related db client methods for postgres client. Signed-off-by: Lindsey Cheng --- cmd/core-data/res/db/sql/01-tables.sql | 20 + internal/pkg/db/postgres/utils.go | 8 +- .../pkg/infrastructure/postgres/consts.go | 34 ++ internal/pkg/infrastructure/postgres/event.go | 148 +++--- .../pkg/infrastructure/postgres/reading.go | 445 ++++++++++++++++-- internal/pkg/infrastructure/postgres/sql.go | 65 ++- 6 files changed, 631 insertions(+), 89 deletions(-) create mode 100644 internal/pkg/infrastructure/postgres/consts.go diff --git a/cmd/core-data/res/db/sql/01-tables.sql b/cmd/core-data/res/db/sql/01-tables.sql index 2e4308cdda..4e58b038d0 100644 --- a/cmd/core-data/res/db/sql/01-tables.sql +++ b/cmd/core-data/res/db/sql/01-tables.sql @@ -12,3 +12,23 @@ CREATE TABLE IF NOT EXISTS core_data.event ( origin BIGINT, tags JSONB ); + +-- core_data.reading is used to store the reading information +CREATE TABLE IF NOT EXISTS core_data.reading ( + id UUID PRIMARY KEY, + event_id UUID, + devicename TEXT, + profilename TEXT, + resourcename TEXT, + origin BIGINT, + valuetype TEXT DEFAULT '', + units TEXT DEFAULT '', + tags JSONB, + value TEXT, + mediatype TEXT, + binaryvalue BYTEA, + objectvalue JSONB, + CONSTRAINT fk_event + FOREIGN KEY(event_id) + REFERENCES core_data.event(id) +); diff --git a/internal/pkg/db/postgres/utils.go b/internal/pkg/db/postgres/utils.go index 6499dfbd09..3a982c04a7 100644 --- a/internal/pkg/db/postgres/utils.go +++ b/internal/pkg/db/postgres/utils.go @@ -130,7 +130,13 @@ func WrapDBError(message string, err error) errors.EdgeX { var pgErr *pgconn.PgError if goErrors.As(err, &pgErr) { if pgerrcode.IsIntegrityConstraintViolation(pgErr.Code) { - return errors.NewCommonEdgeX(errors.KindDuplicateName, pgErr.Detail, nil) + var errMsg string + if message != "" { + errMsg = message + ": " + } + errMsg += pgErr.Detail + + return errors.NewCommonEdgeX(errors.KindDuplicateName, errMsg, nil) } return errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("%s: %s %s", message, pgErr.Error(), pgErr.Detail), nil) } diff --git a/internal/pkg/infrastructure/postgres/consts.go b/internal/pkg/infrastructure/postgres/consts.go new file mode 100644 index 0000000000..86b2f7ae8d --- /dev/null +++ b/internal/pkg/infrastructure/postgres/consts.go @@ -0,0 +1,34 @@ +// +// Copyright (C) 2024 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +// constants relate to the postgres db schema names +const ( + coreDataSchema = "core_data" +) + +// constants relate to the postgres db table names +const ( + eventTableName = coreDataSchema + ".event" + readingTableName = coreDataSchema + ".reading" +) + +// constants relate to the event/reading postgres db table column names +const ( + deviceNameCol = "devicename" + resourceNameCol = "resourcename" + profileNameCol = "profilename" + sourceNameCol = "sourcename" + originCol = "origin" + valueTypeCol = "valuetype" + unitsCol = "units" + tagsCol = "tags" + eventIdFKCol = "event_id" + valueCol = "value" + binaryValueCol = "binaryvalue" + mediaTypeCol = "mediatype" + objectValueCol = "objectvalue" +) diff --git a/internal/pkg/infrastructure/postgres/event.go b/internal/pkg/infrastructure/postgres/event.go index 59a4802e6f..620f52f3a6 100644 --- a/internal/pkg/infrastructure/postgres/event.go +++ b/internal/pkg/infrastructure/postgres/event.go @@ -22,17 +22,6 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) -const ( - eventTableName = "core_data.event" - - // constants relate to the event struct field names - deviceNameCol = "devicename" - profileNameCol = "profilename" - sourceNameCol = "sourcename" - originCol = "origin" - tagsCol = "tags" -) - // AllEvents queries the events with the given range, offset, and limit func (c *Client) AllEvents(offset, limit int) ([]model.Event, errors.EdgeX) { ctx := context.Background() @@ -65,22 +54,33 @@ func (c *Client) AddEvent(e model.Event) (model.Event, errors.EdgeX) { return model.Event{}, errors.NewCommonEdgeX(errors.KindServerError, "unable to JSON marshal event tags", err) } - _, err = c.ConnPool.Exec( - ctx, - sqlInsert(eventTableName, idCol, deviceNameCol, profileNameCol, sourceNameCol, originCol, tagsCol), - event.Id, - event.DeviceName, - event.ProfileName, - event.SourceName, - event.Origin, - tagsBytes, - ) - if err != nil { - return model.Event{}, pgClient.WrapDBError("failed to insert event", err) - } + err = pgx.BeginFunc(ctx, c.ConnPool, func(tx pgx.Tx) error { + // insert event in a transaction + _, err = tx.Exec( + ctx, + sqlInsert(eventTableName, idCol, deviceNameCol, profileNameCol, sourceNameCol, originCol, tagsCol), + event.Id, + event.DeviceName, + event.ProfileName, + event.SourceName, + event.Origin, + tagsBytes, + ) + if err != nil { + return pgClient.WrapDBError("failed to insert event", err) + } - // TODO: readings included in this event will be added to database in the following PRs + // insert readings in a transaction + err = addReadingsInTx(tx, e.Readings, e.Id) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + return nil + }) + if err != nil { + return model.Event{}, errors.NewCommonEdgeXWrapper(err) + } return event, nil } @@ -96,9 +96,16 @@ func (c *Client) EventById(id string) (model.Event, errors.EdgeX) { event, err = pgx.CollectExactlyOneRow(rows, func(row pgx.CollectableRow) (model.Event, error) { e, err := pgx.RowToStructByNameLax[model.Event](row) + if err != nil { + return model.Event{}, err + } - // TODO: readings data will be added to the event model in the following PRs + readings, err := queryReadings(ctx, c.ConnPool, sqlQueryAllByCol(readingTableName, eventIdFKCol), e.Id) + if err != nil { + return model.Event{}, err + } + e.Readings = readings return e, err }) if err != nil { @@ -125,7 +132,7 @@ func (c *Client) EventCountByDeviceName(deviceName string) (uint32, errors.EdgeX // EventCountByTimeRange returns the count of Event by time range from db func (c *Client) EventCountByTimeRange(start int, end int) (uint32, errors.EdgeX) { - return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRangeCol(eventTableName, originCol), start, end) + return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRangeCol(eventTableName, originCol, nil), start, end) } // EventsByDeviceName query events by offset, limit and device name @@ -142,7 +149,7 @@ func (c *Client) EventsByDeviceName(offset int, limit int, name string) ([]model // EventsByTimeRange query events by time range, offset, and limit func (c *Client) EventsByTimeRange(start int, end int, offset int, limit int) ([]model.Event, errors.EdgeX) { ctx := context.Background() - sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(eventTableName, originCol, originCol) + sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(eventTableName, originCol, originCol, nil) events, err := queryEvents(ctx, c.ConnPool, sqlStatement, start, end, offset, limit) if err != nil { @@ -153,15 +160,23 @@ func (c *Client) EventsByTimeRange(start int, end int, offset int, limit int) ([ // DeleteEventById removes an event by id func (c *Client) DeleteEventById(id string) errors.EdgeX { + ctx := context.Background() sqlStatement := sqlDeleteById(eventTableName) - err := deleteEvents(context.Background(), c.ConnPool, sqlStatement, id) + // delete event and readings in a transaction + err := pgx.BeginFunc(ctx, c.ConnPool, func(tx pgx.Tx) error { + if err := deleteReadings(ctx, tx, id); err != nil { + return err + } + + if err := deleteEvents(ctx, tx, sqlStatement, id); err != nil { + return err + } + return nil + }) if err != nil { - return errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed delete event with id '%s'", id), err) + return errors.NewCommonEdgeX(errors.Kind(err), "failed delete event", err) } - - // TODO: delete related readings associated to the deleted events - return nil } @@ -173,14 +188,24 @@ func (c *Client) DeleteEventsByDeviceName(deviceName string) errors.EdgeX { sqlStatement := sqlDeleteByColumn(eventTableName, deviceNameCol) go func() { - err := deleteEvents(ctx, c.ConnPool, sqlStatement, deviceName) - if err != nil { - c.loggingClient.Errorf("failed delete event with device '%s': %v", deviceName, err) - } + // delete events and readings in a transaction + _ = pgx.BeginFunc(ctx, c.ConnPool, func(tx pgx.Tx) error { + // select the event-ids of the specified device name from event table as the sub-query of deleting readings + subSqlStatement := sqlQueryFieldsByCol(eventTableName, []string{idCol}, deviceNameCol) + if err := deleteReadingsBySubQuery(ctx, tx, subSqlStatement, deviceName); err != nil { + c.loggingClient.Errorf("failed delete readings with device '%s': %v", deviceName, err) + return err + } + + err := deleteEvents(ctx, tx, sqlStatement, deviceName) + if err != nil { + c.loggingClient.Errorf("failed delete event with device '%s': %v", deviceName, err) + return err + } + return nil + }) }() - // TODO: delete related readings associated to the deleted events - return nil } @@ -192,14 +217,24 @@ func (c *Client) DeleteEventsByAge(age int64) errors.EdgeX { sqlStatement := sqlDeleteTimeRangeByColumn(eventTableName, originCol) go func() { - err := deleteEvents(ctx, c.ConnPool, sqlStatement, expireTimestamp) - if err != nil { - c.loggingClient.Errorf("failed delete event by age '%d' nanoseconds: %v", age, err) - } + // delete events and readings in a transaction + _ = pgx.BeginFunc(ctx, c.ConnPool, func(tx pgx.Tx) error { + // select the event ids within the origin time range from event table as the sub-query of deleting readings + subSqlStatement := sqlQueryFieldsByTimeRange(eventTableName, []string{idCol}, originCol) + if err := deleteReadingsBySubQuery(ctx, tx, subSqlStatement, expireTimestamp); err != nil { + c.loggingClient.Errorf("failed delete readings by age '%d' nanoseconds: %v", age, err) + return err + } + + err := deleteEvents(ctx, tx, sqlStatement, expireTimestamp) + if err != nil { + c.loggingClient.Errorf("failed delete event by age '%d' nanoseconds: %v", age, err) + return err + } + return nil + }) }() - // TODO: delete related readings associated to the deleted events - return nil } @@ -213,10 +248,17 @@ func queryEvents(ctx context.Context, connPool *pgxpool.Pool, sql string, args . var events []model.Event events, err = pgx.CollectRows(rows, func(row pgx.CollectableRow) (model.Event, error) { event, err := pgx.RowToStructByNameLax[model.Event](row) + if err != nil { + return model.Event{}, err + } - // TODO: readings data will be added to the event model in the following PRs + readings, err := queryReadings(ctx, connPool, sqlQueryAllByCol(readingTableName, eventIdFKCol), event.Id) + if err != nil { + return model.Event{}, err + } - return event, err + event.Readings = readings + return event, nil }) if err != nil { @@ -226,18 +268,18 @@ func queryEvents(ctx context.Context, connPool *pgxpool.Pool, sql string, args . return events, nil } -// deleteEvents delete the data rows with given sql statement and passed args -func deleteEvents(ctx context.Context, connPool *pgxpool.Pool, sqlStatement string, args ...any) errors.EdgeX { - commandTag, err := connPool.Exec( +// deleteEvents delete the data rows with given sql statement and passed args in a db transaction +func deleteEvents(ctx context.Context, tx pgx.Tx, sqlStatement string, args ...any) errors.EdgeX { + commandTag, err := tx.Exec( ctx, sqlStatement, args..., ) - if commandTag.RowsAffected() == 0 { - return errors.NewCommonEdgeX(errors.KindContractInvalid, "no event found", nil) - } if err != nil { return pgClient.WrapDBError("event(s) delete failed", err) } + if commandTag.RowsAffected() == 0 { + return errors.NewCommonEdgeX(errors.KindContractInvalid, "no event found", nil) + } return nil } diff --git a/internal/pkg/infrastructure/postgres/reading.go b/internal/pkg/infrastructure/postgres/reading.go index 6969f51211..56279655ad 100644 --- a/internal/pkg/infrastructure/postgres/reading.go +++ b/internal/pkg/infrastructure/postgres/reading.go @@ -6,78 +6,465 @@ package postgres import ( + "context" + "encoding/json" + "fmt" + "strings" + + pgClient "github.com/edgexfoundry/edgex-go/internal/pkg/db/postgres" + + "github.com/edgexfoundry/go-mod-core-contracts/v3/common" "github.com/edgexfoundry/go-mod-core-contracts/v3/errors" model "github.com/edgexfoundry/go-mod-core-contracts/v3/models" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" ) +var insertBaseReadingCol = []string{idCol, eventIdFKCol, deviceNameCol, profileNameCol, resourceNameCol, originCol, valueTypeCol, unitsCol, tagsCol} + func (c *Client) ReadingTotalCount() (uint32, errors.EdgeX) { - return 0, nil + return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCount(readingTableName)) } func (c *Client) AllReadings(offset int, limit int) ([]model.Reading, errors.EdgeX) { - return nil, nil -} + ctx := context.Background() -func (c *Client) ReadingsByTimeRange(start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { - return nil, nil + readings, err := queryReadings(ctx, c.ConnPool, sqlQueryAllWithPagination(readingTableName), offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, "failed to query all readings", err) + } + + return readings, nil } +// ReadingsByResourceName query readings by offset, limit and resource name func (c *Client) ReadingsByResourceName(offset int, limit int, resourceName string) ([]model.Reading, errors.EdgeX) { - return nil, nil + sqlStatement := sqlQueryAllByColWithPagination(readingTableName, resourceNameCol) + + readings, err := queryReadings(context.Background(), c.ConnPool, sqlStatement, resourceName, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to query readings by resource '%s'", resourceName), err) + } + return readings, nil } +// ReadingsByDeviceName query readings by offset, limit and device name func (c *Client) ReadingsByDeviceName(offset int, limit int, name string) ([]model.Reading, errors.EdgeX) { - return nil, nil + sqlStatement := sqlQueryAllByColWithPagination(readingTableName, deviceNameCol) + + readings, err := queryReadings(context.Background(), c.ConnPool, sqlStatement, name, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to query readings by device '%s'", name), err) + } + return readings, nil } +// ReadingsByDeviceNameAndResourceName query readings by offset, limit, device name and resource name func (c *Client) ReadingsByDeviceNameAndResourceName(deviceName string, resourceName string, offset int, limit int) ([]model.Reading, errors.EdgeX) { - return nil, nil + sqlStatement := sqlQueryAllByColWithPagination(readingTableName, deviceNameCol, resourceNameCol) + + readings, err := queryReadings(context.Background(), c.ConnPool, sqlStatement, deviceName, resourceName, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, + fmt.Sprintf("failed to query readings by device '%s' and resource '%s'", deviceName, resourceName), err) + } + return readings, nil +} + +// ReadingsByTimeRange query readings by origin within the time range with offset and limit +func (c *Client) ReadingsByTimeRange(start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { + ctx := context.Background() + sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(readingTableName, originCol, originCol, nil) + + readings, err := queryReadings(ctx, c.ConnPool, sqlStatement, start, end, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + return readings, nil +} + +// ReadingsByDeviceNameAndTimeRange query readings by the specified device, origin within the time range, offset, and limit +func (c *Client) ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { + ctx := context.Background() + sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(readingTableName, originCol, originCol, nil, deviceNameCol) + + readings, err := queryReadings(ctx, c.ConnPool, sqlStatement, start, end, deviceName, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + return readings, nil } +// ReadingsByResourceNameAndTimeRange query readings by the specified resource, origin within the time range, offset, and limit +func (c *Client) ReadingsByResourceNameAndTimeRange(resourceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { + ctx := context.Background() + sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(readingTableName, originCol, originCol, nil, resourceNameCol) + + readings, err := queryReadings(ctx, c.ConnPool, sqlStatement, start, end, resourceName, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + return readings, nil +} + +// ReadingsByDeviceNameAndResourceNameAndTimeRange query readings by the specified device and resource, origin within the time range, offset, and limit func (c *Client) ReadingsByDeviceNameAndResourceNameAndTimeRange(deviceName string, resourceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { - return nil, nil + ctx := context.Background() + sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(readingTableName, originCol, originCol, nil, deviceNameCol, resourceNameCol) + + readings, err := queryReadings(ctx, c.ConnPool, sqlStatement, start, end, deviceName, resourceName, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + return readings, nil } +// ReadingsByDeviceNameAndResourceNamesAndTimeRange query readings by the specified device and resourceName slice, origin within the time range, offset and limit +func (c *Client) ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int) ([]model.Reading, uint32, errors.EdgeX) { + ctx := context.Background() + + sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(readingTableName, originCol, originCol, + []string{resourceNameCol}, deviceNameCol, resourceNameCol) + + // build the query args for the where condition using in querying readings and reading count + queryArgs := []any{start, end, deviceName, resourceNames} + // make a copy for query count args as we don't need offset and limit while querying total count + queryCountArgs := append([]any{}, queryArgs...) + // add offset and limit for query args + queryArgs = append(queryArgs, offset, limit) + + // query readings + readings, err := queryReadings(ctx, c.ConnPool, sqlStatement, queryArgs...) + if err != nil { + return nil, 0, errors.NewCommonEdgeXWrapper(err) + } + + // get the total count of readings based on the condition column names and query count args + totalCount, err := getTotalRowsCount(context.Background(), + c.ConnPool, + sqlQueryCountByTimeRangeCol(readingTableName, originCol, []string{resourceNameCol}, deviceNameCol, resourceNameCol), + queryCountArgs...) + if err != nil { + return nil, 0, errors.NewCommonEdgeXWrapper(err) + } + return readings, totalCount, nil +} + +// ReadingCountByDeviceName returns the count of Readings associated a specific Device from db func (c *Client) ReadingCountByDeviceName(deviceName string) (uint32, errors.EdgeX) { - return 0, nil + sqlStatement := sqlQueryCountByCol(readingTableName, deviceNameCol) + + return getTotalRowsCount(context.Background(), c.ConnPool, sqlStatement, deviceName) } +// ReadingCountByResourceName returns the count of Readings associated a specific resource from db func (c *Client) ReadingCountByResourceName(resourceName string) (uint32, errors.EdgeX) { - return 0, nil -} + sqlStatement := sqlQueryCountByCol(readingTableName, resourceNameCol) -func (c *Client) ReadingCountByResourceNameAndTimeRange(resourceName string, start int, end int) (uint32, errors.EdgeX) { - return 0, nil + return getTotalRowsCount(context.Background(), c.ConnPool, sqlStatement, resourceName) } +// ReadingCountByDeviceNameAndResourceName returns the count of readings associated a specific device and resource values from db func (c *Client) ReadingCountByDeviceNameAndResourceName(deviceName string, resourceName string) (uint32, errors.EdgeX) { - return 0, nil -} + sqlStatement := sqlQueryCountByCol(readingTableName, deviceNameCol, resourceNameCol) -func (c *Client) ReadingCountByDeviceNameAndResourceNameAndTimeRange(deviceName string, resourceName string, start int, end int) (uint32, errors.EdgeX) { - return 0, nil + return getTotalRowsCount(context.Background(), c.ConnPool, sqlStatement, deviceName, resourceName) } +// ReadingCountByTimeRange returns the count of reading by origin within the time range from db func (c *Client) ReadingCountByTimeRange(start int, end int) (uint32, errors.EdgeX) { - return 0, nil + return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRangeCol(readingTableName, originCol, nil), start, end) } -func (c *Client) ReadingsByResourceNameAndTimeRange(resourceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { - return nil, nil +// ReadingCountByDeviceNameAndTimeRange returns the count of readings by origin within the time range and the specified device from db +func (c *Client) ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX) { + return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRangeCol(readingTableName, originCol, nil, deviceNameCol), start, end, deviceName) } -func (c *Client) ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int) ([]model.Reading, uint32, errors.EdgeX) { - return nil, 0, nil +// ReadingCountByResourceNameAndTimeRange returns the count of readings by origin within the time range and the specified resource from db +func (c *Client) ReadingCountByResourceNameAndTimeRange(resourceName string, start int, end int) (uint32, errors.EdgeX) { + return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRangeCol(readingTableName, originCol, nil, resourceNameCol), start, end, resourceName) } -func (c *Client) ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { - return nil, nil +// ReadingCountByDeviceNameAndResourceNameAndTimeRange returns the count of readings by origin within the time range +// associated with the specified device and resource from db +func (c *Client) ReadingCountByDeviceNameAndResourceNameAndTimeRange(deviceName string, resourceName string, start int, end int) (uint32, errors.EdgeX) { + return getTotalRowsCount(context.Background(), + c.ConnPool, + sqlQueryCountByTimeRangeCol(readingTableName, originCol, nil, deviceNameCol, resourceNameCol), + start, + end, + deviceName, + resourceName) } -func (c *Client) ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX) { - return 0, nil +func (c *Client) LatestReadingByOffset(offset uint32) (model.Reading, errors.EdgeX) { + ctx := context.Background() + + readings, err := queryReadings(ctx, c.ConnPool, sqlQueryAllWithPaginationDescByCol(readingTableName, originCol), offset, 1) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, "failed to query all readings", err) + } + + if len(readings) == 0 { + return nil, errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("no reading found with offset '%d'", offset), err) + } + return readings[0], nil } -func (c *Client) LatestReadingByOffset(offset uint32) (model.Reading, errors.EdgeX) { - return nil, nil +// queryReadings queries the data rows with given sql statement and passed args, converts the rows to map and unmarshal the data rows to the Reading model slice +func queryReadings(ctx context.Context, connPool *pgxpool.Pool, sql string, args ...any) ([]model.Reading, errors.EdgeX) { + rows, err := connPool.Query(ctx, sql, args...) + if err != nil { + return nil, pgClient.WrapDBError("query failed", err) + } + + readings, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (model.Reading, error) { + var reading model.Reading + var baseReading model.BaseReading + + readingMap, err := pgx.RowToMap(row) + if err != nil { + return nil, pgClient.WrapDBError("failed to convert row to map", err) + } + + // retrieve the BaseReading fields from the map + if id, ok := readingMap[idCol].([16]uint8); ok { + baseReading.Id = fmt.Sprintf("%x-%x-%x-%x-%x", id[0:4], id[4:6], id[6:8], id[8:10], id[10:16]) + } + if origin, ok := readingMap[originCol].(int64); ok { + baseReading.Origin = origin + } + if deviceName, ok := readingMap[deviceNameCol].(string); ok { + baseReading.DeviceName = deviceName + } + if profileName, ok := readingMap[profileNameCol].(string); ok { + baseReading.ProfileName = profileName + } + if resourceName, ok := readingMap[resourceNameCol].(string); ok { + baseReading.ResourceName = resourceName + } + if valueType, ok := readingMap[valueTypeCol].(string); ok { + baseReading.ValueType = valueType + } + if units, ok := readingMap[unitsCol].(string); ok { + baseReading.Units = units + } + if tags, ok := readingMap[tagsCol].(map[string]any); ok { + baseReading.Tags = tags + } + + value, ok := readingMap[valueCol].(string) + if ok && value != "" { + // reading type is SimpleReading + simpleReading := model.SimpleReading{ + BaseReading: baseReading, + Value: value, + } + reading = simpleReading + } else { + // reading type is not SimpleReading, check if the reading belongs to either BinaryReading or ObjectReading + if baseReading.ValueType == common.ValueTypeBinary { + // reading type is BinaryReading + binaryReading := model.BinaryReading{ + BaseReading: baseReading, + } + if mediaType, ok := readingMap[mediaTypeCol].(string); ok { + binaryReading.MediaType = mediaType + } + if binaryValue, ok := readingMap[binaryValueCol].([]byte); ok { + binaryReading.BinaryValue = binaryValue + } + reading = binaryReading + } else if baseReading.ValueType == common.ValueTypeObject { + // reading type is ObjectReading + objReading := model.ObjectReading{ + BaseReading: baseReading, + } + if objValue, ok := readingMap[objectValueCol]; ok { + objReading.ObjectValue = objValue + } + reading = objReading + } else { + return nil, errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("unknown reading value type '%s'", baseReading.ValueType), nil) + } + } + + return reading, nil + }) + + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + + return readings, nil +} + +// deleteReadings delete the data rows with given sql statement and passed args +func deleteReadings(ctx context.Context, tx pgx.Tx, args ...any) errors.EdgeX { + sqlStatement := sqlDeleteByColumn(readingTableName, eventIdFKCol) + commandTag, err := tx.Exec( + ctx, + sqlStatement, + args..., + ) + if commandTag.RowsAffected() == 0 { + return errors.NewCommonEdgeX(errors.KindContractInvalid, "no reading found", nil) + } + if err != nil { + return pgClient.WrapDBError("reading(s) delete failed", err) + } + return nil +} + +// deleteReadings delete the readings with event_id in the range of the sub query +func deleteReadingsBySubQuery(ctx context.Context, tx pgx.Tx, subQuerySql string, args ...any) errors.EdgeX { + sqlStatement := sqlDeleteByColumn(readingTableName, eventIdFKCol) + subQueryCond := "ANY ( " + subQuerySql + " )" + sqlStatement = strings.Replace(sqlStatement, "$1", subQueryCond, 1) + + commandTag, err := tx.Exec( + ctx, + sqlStatement, + args..., + ) + if commandTag.RowsAffected() == 0 { + return errors.NewCommonEdgeX(errors.KindContractInvalid, "no reading found", nil) + } + if err != nil { + return pgClient.WrapDBError("reading(s) delete failed", err) + } + return nil +} + +// addReadingsInTx converts reading interface to BinaryReading/ObjectReading/SimpleReading structs first based on the reading value type +// and then perform the CopyFromSlice transaction to insert readings in batch +func addReadingsInTx(tx pgx.Tx, readings []model.Reading, eventId string) error { + var binaryReadings []model.BinaryReading + var objReadings []model.ObjectReading + var simpleReadings []model.SimpleReading + + var err error + + for _, r := range readings { + baseReading := r.GetBaseReading() + + valueType := baseReading.ValueType + if valueType == common.ValueTypeBinary { + // convert reading to BinaryReading struct + b, ok := r.(model.BinaryReading) + if !ok { + return errors.NewCommonEdgeX(errors.KindServerError, "failed to convert reading to BinaryReading model", nil) + } + binaryReadings = append(binaryReadings, b) + } else if valueType == common.ValueTypeObject { + // convert reading to ObjectReading struct + o, ok := r.(model.ObjectReading) + if !ok { + return errors.NewCommonEdgeX(errors.KindServerError, "failed to convert reading to ObjectReading model", nil) + } + objReadings = append(objReadings, o) + } else { + // convert reading to SimpleReading struct + s, ok := r.(model.SimpleReading) + if !ok { + return errors.NewCommonEdgeX(errors.KindServerError, "failed to convert reading to SimpleReading model", nil) + } + simpleReadings = append(simpleReadings, s) + } + } + + // insert binary readings in batch + if len(binaryReadings) > 0 { + binaryReadingCols := append(insertBaseReadingCol, mediaTypeCol, binaryValueCol) + + _, err = tx.CopyFrom( + context.Background(), + strings.Split(readingTableName, "."), + binaryReadingCols, + pgx.CopyFromSlice(len(binaryReadings), func(i int) ([]any, error) { + tagsBytes, err := json.Marshal(binaryReadings[i].Tags) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindServerError, "unable to JSON marshal reading tags", err) + } + return []any{ + binaryReadings[i].Id, + eventId, + binaryReadings[i].DeviceName, + binaryReadings[i].ProfileName, + binaryReadings[i].ResourceName, + binaryReadings[i].Origin, + binaryReadings[i].ValueType, + binaryReadings[i].Units, + tagsBytes, + binaryReadings[i].MediaType, + binaryReadings[i].BinaryValue, + }, nil + }), + ) + } + + // insert object readings in batch + if len(objReadings) > 0 { + objReadingCols := append(insertBaseReadingCol, objectValueCol) + + _, err = tx.CopyFrom( + context.Background(), + strings.Split(readingTableName, "."), + objReadingCols, + pgx.CopyFromSlice(len(objReadings), func(i int) ([]any, error) { + tagsBytes, err := json.Marshal(objReadings[i].Tags) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindServerError, "unable to JSON marshal reading tags", err) + } + return []any{ + objReadings[i].Id, + eventId, + objReadings[i].DeviceName, + objReadings[i].ProfileName, + objReadings[i].ResourceName, + objReadings[i].Origin, + objReadings[i].ValueType, + objReadings[i].Units, + tagsBytes, + objReadings[i].ObjectValue, + }, nil + }), + ) + } + + // insert simple readings in batch + if len(simpleReadings) > 0 { + simpleReadingCols := append(insertBaseReadingCol, valueCol) + + _, err = tx.CopyFrom( + context.Background(), + strings.Split(readingTableName, "."), + simpleReadingCols, + pgx.CopyFromSlice(len(simpleReadings), func(i int) ([]any, error) { + tagsBytes, err := json.Marshal(simpleReadings[i].Tags) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindServerError, "unable to JSON marshal reading tags", err) + } + return []any{ + simpleReadings[i].Id, + eventId, + simpleReadings[i].DeviceName, + simpleReadings[i].ProfileName, + simpleReadings[i].ResourceName, + simpleReadings[i].Origin, + simpleReadings[i].ValueType, + simpleReadings[i].Units, + tagsBytes, + simpleReadings[i].Value, + }, nil + }), + ) + } + if err != nil { + return pgClient.WrapDBError("failed to insert readings", err) + } + + return nil } diff --git a/internal/pkg/infrastructure/postgres/sql.go b/internal/pkg/infrastructure/postgres/sql.go index c790455281..830e02a4ad 100644 --- a/internal/pkg/infrastructure/postgres/sql.go +++ b/internal/pkg/infrastructure/postgres/sql.go @@ -10,6 +10,7 @@ package postgres import ( "fmt" + "slices" "strings" ) @@ -55,6 +56,14 @@ func sqlInsertContent(table string) string { // return fmt.Sprintf("SELECT * FROM %s", table) //} +// sqlQueryAllByCol returns the SQL statement for selecting all rows from the table by the given columns +func sqlQueryFieldsByCol(table string, fields []string, columns ...string) string { + whereCondition := constructWhereCondition(columns...) + queryFieldStr := strings.Join(fields, ", ") + + return fmt.Sprintf("SELECT %s FROM %s WHERE %s", queryFieldStr, table, whereCondition) +} + // sqlQueryAllWithTimeRange returns the SQL statement for selecting all rows from the table with a time range. //func sqlQueryAllWithTimeRange(table string) string { // return fmt.Sprintf("SELECT * FROM %s WHERE %s >= $1 AND %s <= $2", table, createdCol, createdCol) @@ -70,6 +79,11 @@ func sqlQueryAllWithPagination(table string) string { // return fmt.Sprintf("SELECT * FROM %s ORDER BY %s DESC OFFSET $1 LIMIT $2", table, createdCol) //} +// sqlQueryAllWithPaginationDescByCol returns the SQL statement for selecting all rows from the table with the pagination and desc by descCol +func sqlQueryAllWithPaginationDescByCol(table string, descCol string) string { + return fmt.Sprintf("SELECT * FROM %s ORDER BY %s DESC OFFSET $1 LIMIT $2", table, descCol) +} + // sqlQueryAllByColWithPagination returns the SQL statement for selecting all rows from the table by the given columns with pagination func sqlQueryAllByColWithPagination(table string, columns ...string) string { columnCount := len(columns) @@ -83,10 +97,14 @@ func sqlQueryAllWithPaginationAndTimeRange(table string) string { return fmt.Sprintf("SELECT * FROM %s WHERE %s >= $1 AND %s <= $2 ORDER BY %s OFFSET $3 LIMIT $4", table, createdCol, createdCol, createdCol) } -// sqlQueryAllWithPaginationAndTimeRangeDescByCol returns the SQL statement for selecting all rows from the table -// with pagination and a time range by column1, desc by column2 -func sqlQueryAllWithPaginationAndTimeRangeDescByCol(table string, timeRangeCol string, descCol string) string { - return fmt.Sprintf("SELECT * FROM %s WHERE %s >= $1 AND %s <= $2 ORDER BY %s DESC OFFSET $3 LIMIT $4", table, timeRangeCol, timeRangeCol, descCol) +// sqlQueryAllWithPaginationAndTimeRangeDescByCol returns the SQL statement for selecting all rows from the table with the arrayColNames slice, +// provided columns with pagination and a time range by timeRangeCol, desc by descCol +func sqlQueryAllWithPaginationAndTimeRangeDescByCol(table string, timeRangeCol string, descCol string, arrayColNames []string, columns ...string) string { + whereCondition := constructWhereCondWithTimeRange(timeRangeCol, arrayColNames, columns...) + columnCount := len(columns) + + return fmt.Sprintf("SELECT * FROM %s WHERE %s ORDER BY %s DESC OFFSET $%d LIMIT $%d", + table, whereCondition, descCol, columnCount+3, columnCount+4) } // sqlQueryAllByStatusWithPaginationAndTimeRange returns the SQL statement for selecting all rows from the table by status with pagination and a time range. @@ -161,8 +179,9 @@ func sqlQueryCountByCol(table string, columns ...string) string { // sqlQueryCountByTimeRangeCol returns the SQL statement for counting the number of rows in the table // by the given time range of the specified column -func sqlQueryCountByTimeRangeCol(table string, column string) string { - return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE %s >= $1 AND %s <= $2", table, column, column) +func sqlQueryCountByTimeRangeCol(table string, timeRangeCol string, arrayColNames []string, columns ...string) string { + whereCondition := constructWhereCondWithTimeRange(timeRangeCol, arrayColNames, columns...) + return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE %s", table, whereCondition) } // sqlQueryCountByJSONField returns the SQL statement for counting the number of rows in the table by the given JSON query string @@ -176,6 +195,20 @@ func sqlQueryCountByTimeRangeCol(table string, column string) string { // return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE (content->'%s')::bigint >= $1 AND (content->'%s')::bigint <= $2", table, field, field) //} +// sqlQueryAllByCol returns the SQL statement for selecting all rows from the table by the given column in WHERE condition +func sqlQueryAllByCol(table string, columns ...string) string { + whereCondition := constructWhereCondition(columns...) + + return fmt.Sprintf("SELECT * FROM %s WHERE %s", table, whereCondition) +} + +// sqlQueryFieldsByTimeRange returns the SQL statement for selecting fields from the table within the time range +func sqlQueryFieldsByTimeRange(table string, fields []string, timeRangeCol string) string { + queryFieldStr := strings.Join(fields, ", ") + + return fmt.Sprintf("SELECT %s FROM %s WHERE %s <= $1", queryFieldStr, table, timeRangeCol) +} + // ---------------------------------------------------------------------------------- // SQL statements for UPDATE operations // ---------------------------------------------------------------------------------- @@ -240,3 +273,23 @@ func constructWhereCondition(columns ...string) string { return strings.Join(conditions, " AND ") } + +// constructWhereCondition constructs the WHERE condition for the given columns with time range +// if arrayColNames is not empty, ANY operator will be added to accept the array argument for the specified array col names +func constructWhereCondWithTimeRange(timeRangeCol string, arrayColNames []string, columns ...string) string { + var hasArrayColumn bool + conditions := []string{timeRangeCol + " >= $1", timeRangeCol + " <= $2"} + + if len(arrayColNames) > 0 { + hasArrayColumn = true + } + for i, column := range columns { + equalCondition := "%s = $%d" + if hasArrayColumn && slices.Contains(arrayColNames, column) { + equalCondition = "%s = ANY ($%d)" + } + conditions = append(conditions, fmt.Sprintf(equalCondition, column, i+3)) + } + + return strings.Join(conditions, " AND ") +} From 0ed9d1bb2dca2b64ca9a9223bf65058b9206a656 Mon Sep 17 00:00:00 2001 From: Lindsey Cheng Date: Thu, 15 Aug 2024 17:16:25 +0800 Subject: [PATCH 2/4] fix: Add a new Reading model for postgres Add a new Reading model for postgres for insert/query reading usage. Signed-off-by: Lindsey Cheng --- internal/pkg/infrastructure/postgres/event.go | 4 +- .../infrastructure/postgres/models/reading.go | 44 ++++ .../pkg/infrastructure/postgres/reading.go | 240 +++++++----------- 3 files changed, 137 insertions(+), 151 deletions(-) create mode 100644 internal/pkg/infrastructure/postgres/models/reading.go diff --git a/internal/pkg/infrastructure/postgres/event.go b/internal/pkg/infrastructure/postgres/event.go index 620f52f3a6..7127d8a1c7 100644 --- a/internal/pkg/infrastructure/postgres/event.go +++ b/internal/pkg/infrastructure/postgres/event.go @@ -100,7 +100,7 @@ func (c *Client) EventById(id string) (model.Event, errors.EdgeX) { return model.Event{}, err } - readings, err := queryReadings(ctx, c.ConnPool, sqlQueryAllByCol(readingTableName, eventIdFKCol), e.Id) + readings, err := queryReadings(ctx, c.ConnPool, sqlQueryFieldsByCol(readingTableName, queryReadingCols, eventIdFKCol), e.Id) if err != nil { return model.Event{}, err } @@ -252,7 +252,7 @@ func queryEvents(ctx context.Context, connPool *pgxpool.Pool, sql string, args . return model.Event{}, err } - readings, err := queryReadings(ctx, connPool, sqlQueryAllByCol(readingTableName, eventIdFKCol), event.Id) + readings, err := queryReadings(ctx, connPool, sqlQueryFieldsByCol(readingTableName, queryReadingCols, eventIdFKCol), event.Id) if err != nil { return model.Event{}, err } diff --git a/internal/pkg/infrastructure/postgres/models/reading.go b/internal/pkg/infrastructure/postgres/models/reading.go new file mode 100644 index 0000000000..7d5de1fabf --- /dev/null +++ b/internal/pkg/infrastructure/postgres/models/reading.go @@ -0,0 +1,44 @@ +// +// Copyright (C) 2024 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package models + +import "github.com/edgexfoundry/go-mod-core-contracts/v3/models" + +// Reading struct contains the columns of the core_data.reading table in Postgres db relates to a reading +// which includes all the fields in BaseReading, BinaryReading, SimpleReading and ObjectReading +type Reading struct { + models.BaseReading + BinaryReading + SimpleReading + ObjectReading +} + +type SimpleReading struct { + Value *string +} + +type BinaryReading struct { + BinaryValue []byte + MediaType *string +} + +type ObjectReading struct { + ObjectValue any +} + +// GetBaseReading makes the Reading struct to implement the go-mod-core-contract Reading interface in models +func (r Reading) GetBaseReading() models.BaseReading { + return models.BaseReading{ + Id: r.Id, + Origin: r.Origin, + DeviceName: r.DeviceName, + ResourceName: r.ResourceName, + ProfileName: r.ProfileName, + ValueType: r.ValueType, + Units: r.Units, + Tags: r.Tags, + } +} diff --git a/internal/pkg/infrastructure/postgres/reading.go b/internal/pkg/infrastructure/postgres/reading.go index 56279655ad..fc55aa4b5a 100644 --- a/internal/pkg/infrastructure/postgres/reading.go +++ b/internal/pkg/infrastructure/postgres/reading.go @@ -12,6 +12,7 @@ import ( "strings" pgClient "github.com/edgexfoundry/edgex-go/internal/pkg/db/postgres" + dbModels "github.com/edgexfoundry/edgex-go/internal/pkg/infrastructure/postgres/models" "github.com/edgexfoundry/go-mod-core-contracts/v3/common" "github.com/edgexfoundry/go-mod-core-contracts/v3/errors" @@ -21,7 +22,12 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) -var insertBaseReadingCol = []string{idCol, eventIdFKCol, deviceNameCol, profileNameCol, resourceNameCol, originCol, valueTypeCol, unitsCol, tagsCol} +var ( + // insertReadingCols defines the reading table columns in slice used in inserting readings + insertReadingCols = []string{idCol, eventIdFKCol, deviceNameCol, profileNameCol, resourceNameCol, originCol, valueTypeCol, unitsCol, tagsCol, valueCol, mediaTypeCol, binaryValueCol, objectValueCol} + // queryReadingCols defines the reading table columns in slice used in querying reading + queryReadingCols = []string{idCol, deviceNameCol, profileNameCol, resourceNameCol, originCol, valueTypeCol, unitsCol, tagsCol, valueCol, objectValueCol, mediaTypeCol, binaryValueCol} +) func (c *Client) ReadingTotalCount() (uint32, errors.EdgeX) { return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCount(readingTableName)) @@ -222,73 +228,39 @@ func queryReadings(ctx context.Context, connPool *pgxpool.Pool, sql string, args readings, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (model.Reading, error) { var reading model.Reading - var baseReading model.BaseReading - readingMap, err := pgx.RowToMap(row) + readingDBModel, err := pgx.RowToStructByNameLax[dbModels.Reading](row) if err != nil { return nil, pgClient.WrapDBError("failed to convert row to map", err) } - // retrieve the BaseReading fields from the map - if id, ok := readingMap[idCol].([16]uint8); ok { - baseReading.Id = fmt.Sprintf("%x-%x-%x-%x-%x", id[0:4], id[4:6], id[6:8], id[8:10], id[10:16]) - } - if origin, ok := readingMap[originCol].(int64); ok { - baseReading.Origin = origin - } - if deviceName, ok := readingMap[deviceNameCol].(string); ok { - baseReading.DeviceName = deviceName - } - if profileName, ok := readingMap[profileNameCol].(string); ok { - baseReading.ProfileName = profileName - } - if resourceName, ok := readingMap[resourceNameCol].(string); ok { - baseReading.ResourceName = resourceName - } - if valueType, ok := readingMap[valueTypeCol].(string); ok { - baseReading.ValueType = valueType - } - if units, ok := readingMap[unitsCol].(string); ok { - baseReading.Units = units - } - if tags, ok := readingMap[tagsCol].(map[string]any); ok { - baseReading.Tags = tags - } + // convert the BaseReading fields to BaseReading struct defined in contract + baseReading := readingDBModel.GetBaseReading() + + valueType := baseReading.ValueType - value, ok := readingMap[valueCol].(string) - if ok && value != "" { + if valueType == common.ValueTypeBinary { + // reading type is BinaryReading + binaryReading := model.BinaryReading{ + BaseReading: baseReading, + MediaType: *readingDBModel.MediaType, + BinaryValue: readingDBModel.BinaryValue, + } + reading = binaryReading + } else if baseReading.ValueType == common.ValueTypeObject { + // reading type is ObjectReading + objReading := model.ObjectReading{ + BaseReading: baseReading, + ObjectValue: readingDBModel.ObjectValue, + } + reading = objReading + } else { // reading type is SimpleReading simpleReading := model.SimpleReading{ BaseReading: baseReading, - Value: value, + Value: *readingDBModel.Value, } reading = simpleReading - } else { - // reading type is not SimpleReading, check if the reading belongs to either BinaryReading or ObjectReading - if baseReading.ValueType == common.ValueTypeBinary { - // reading type is BinaryReading - binaryReading := model.BinaryReading{ - BaseReading: baseReading, - } - if mediaType, ok := readingMap[mediaTypeCol].(string); ok { - binaryReading.MediaType = mediaType - } - if binaryValue, ok := readingMap[binaryValueCol].([]byte); ok { - binaryReading.BinaryValue = binaryValue - } - reading = binaryReading - } else if baseReading.ValueType == common.ValueTypeObject { - // reading type is ObjectReading - objReading := model.ObjectReading{ - BaseReading: baseReading, - } - if objValue, ok := readingMap[objectValueCol]; ok { - objReading.ObjectValue = objValue - } - reading = objReading - } else { - return nil, errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("unknown reading value type '%s'", baseReading.ValueType), nil) - } } return reading, nil @@ -341,15 +313,11 @@ func deleteReadingsBySubQuery(ctx context.Context, tx pgx.Tx, subQuerySql string // addReadingsInTx converts reading interface to BinaryReading/ObjectReading/SimpleReading structs first based on the reading value type // and then perform the CopyFromSlice transaction to insert readings in batch func addReadingsInTx(tx pgx.Tx, readings []model.Reading, eventId string) error { - var binaryReadings []model.BinaryReading - var objReadings []model.ObjectReading - var simpleReadings []model.SimpleReading - - var err error + var readingDBModels []dbModels.Reading for _, r := range readings { baseReading := r.GetBaseReading() - + var readingDBModel dbModels.Reading valueType := baseReading.ValueType if valueType == common.ValueTypeBinary { // convert reading to BinaryReading struct @@ -357,113 +325,87 @@ func addReadingsInTx(tx pgx.Tx, readings []model.Reading, eventId string) error if !ok { return errors.NewCommonEdgeX(errors.KindServerError, "failed to convert reading to BinaryReading model", nil) } - binaryReadings = append(binaryReadings, b) + + // convert BinaryReading struct to Reading DB model + readingDBModel = dbModels.Reading{ + BaseReading: baseReading, + BinaryReading: dbModels.BinaryReading{ + BinaryValue: b.BinaryValue, + MediaType: &b.MediaType, + }, + } } else if valueType == common.ValueTypeObject { // convert reading to ObjectReading struct o, ok := r.(model.ObjectReading) if !ok { return errors.NewCommonEdgeX(errors.KindServerError, "failed to convert reading to ObjectReading model", nil) } - objReadings = append(objReadings, o) + + // convert ObjectReading struct to Reading DB model + readingDBModel = dbModels.Reading{ + BaseReading: baseReading, + ObjectReading: dbModels.ObjectReading{ + ObjectValue: o.ObjectValue, + }, + } } else { // convert reading to SimpleReading struct s, ok := r.(model.SimpleReading) if !ok { return errors.NewCommonEdgeX(errors.KindServerError, "failed to convert reading to SimpleReading model", nil) } - simpleReadings = append(simpleReadings, s) - } - } - // insert binary readings in batch - if len(binaryReadings) > 0 { - binaryReadingCols := append(insertBaseReadingCol, mediaTypeCol, binaryValueCol) - - _, err = tx.CopyFrom( - context.Background(), - strings.Split(readingTableName, "."), - binaryReadingCols, - pgx.CopyFromSlice(len(binaryReadings), func(i int) ([]any, error) { - tagsBytes, err := json.Marshal(binaryReadings[i].Tags) - if err != nil { - return nil, errors.NewCommonEdgeX(errors.KindServerError, "unable to JSON marshal reading tags", err) - } - return []any{ - binaryReadings[i].Id, - eventId, - binaryReadings[i].DeviceName, - binaryReadings[i].ProfileName, - binaryReadings[i].ResourceName, - binaryReadings[i].Origin, - binaryReadings[i].ValueType, - binaryReadings[i].Units, - tagsBytes, - binaryReadings[i].MediaType, - binaryReadings[i].BinaryValue, - }, nil - }), - ) + // convert SimpleReading struct to Reading DB model + readingDBModel = dbModels.Reading{ + BaseReading: baseReading, + SimpleReading: dbModels.SimpleReading{Value: &s.Value}, + } + } + readingDBModels = append(readingDBModels, readingDBModel) } - // insert object readings in batch - if len(objReadings) > 0 { - objReadingCols := append(insertBaseReadingCol, objectValueCol) - - _, err = tx.CopyFrom( - context.Background(), - strings.Split(readingTableName, "."), - objReadingCols, - pgx.CopyFromSlice(len(objReadings), func(i int) ([]any, error) { - tagsBytes, err := json.Marshal(objReadings[i].Tags) + // insert readingDBModels slice in batch + _, err := tx.CopyFrom( + context.Background(), + strings.Split(readingTableName, "."), + insertReadingCols, + pgx.CopyFromSlice(len(readingDBModels), func(i int) ([]any, error) { + var tagsBytes []byte + var objectValueBytes []byte + var err error + + r := readingDBModels[i] + if r.Tags != nil { + tagsBytes, err = json.Marshal(r.Tags) if err != nil { return nil, errors.NewCommonEdgeX(errors.KindServerError, "unable to JSON marshal reading tags", err) } - return []any{ - objReadings[i].Id, - eventId, - objReadings[i].DeviceName, - objReadings[i].ProfileName, - objReadings[i].ResourceName, - objReadings[i].Origin, - objReadings[i].ValueType, - objReadings[i].Units, - tagsBytes, - objReadings[i].ObjectValue, - }, nil - }), - ) - } - - // insert simple readings in batch - if len(simpleReadings) > 0 { - simpleReadingCols := append(insertBaseReadingCol, valueCol) - - _, err = tx.CopyFrom( - context.Background(), - strings.Split(readingTableName, "."), - simpleReadingCols, - pgx.CopyFromSlice(len(simpleReadings), func(i int) ([]any, error) { - tagsBytes, err := json.Marshal(simpleReadings[i].Tags) + } + if r.ObjectValue != nil { + objectValueBytes, err = json.Marshal(r.ObjectValue) if err != nil { - return nil, errors.NewCommonEdgeX(errors.KindServerError, "unable to JSON marshal reading tags", err) + return nil, errors.NewCommonEdgeX(errors.KindServerError, "unable to JSON marshal reading ObjectValue", err) } - return []any{ - simpleReadings[i].Id, - eventId, - simpleReadings[i].DeviceName, - simpleReadings[i].ProfileName, - simpleReadings[i].ResourceName, - simpleReadings[i].Origin, - simpleReadings[i].ValueType, - simpleReadings[i].Units, - tagsBytes, - simpleReadings[i].Value, - }, nil - }), - ) - } + } + return []any{ + r.Id, + eventId, + r.DeviceName, + r.ProfileName, + r.ResourceName, + r.Origin, + r.ValueType, + r.Units, + tagsBytes, + r.Value, + r.MediaType, + r.BinaryValue, + objectValueBytes, + }, nil + }), + ) if err != nil { - return pgClient.WrapDBError("failed to insert readings", err) + return pgClient.WrapDBError("failed to insert readings in batch", err) } return nil From bcda1b0a4a326ff38288216b5f25ce415e288b79 Mon Sep 17 00:00:00 2001 From: Lindsey Cheng Date: Fri, 16 Aug 2024 06:17:41 +0800 Subject: [PATCH 3/4] fix: Remove unused sql util function Remove unused sql util function to fix linter check. Signed-off-by: Lindsey Cheng --- internal/pkg/infrastructure/postgres/sql.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/internal/pkg/infrastructure/postgres/sql.go b/internal/pkg/infrastructure/postgres/sql.go index 830e02a4ad..825b399609 100644 --- a/internal/pkg/infrastructure/postgres/sql.go +++ b/internal/pkg/infrastructure/postgres/sql.go @@ -56,7 +56,7 @@ func sqlInsertContent(table string) string { // return fmt.Sprintf("SELECT * FROM %s", table) //} -// sqlQueryAllByCol returns the SQL statement for selecting all rows from the table by the given columns +// sqlQueryFieldsByCol returns the SQL statement for selecting the given fields of rows from the table by the conditions composed of given columns func sqlQueryFieldsByCol(table string, fields []string, columns ...string) string { whereCondition := constructWhereCondition(columns...) queryFieldStr := strings.Join(fields, ", ") @@ -195,13 +195,6 @@ func sqlQueryCountByTimeRangeCol(table string, timeRangeCol string, arrayColName // return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE (content->'%s')::bigint >= $1 AND (content->'%s')::bigint <= $2", table, field, field) //} -// sqlQueryAllByCol returns the SQL statement for selecting all rows from the table by the given column in WHERE condition -func sqlQueryAllByCol(table string, columns ...string) string { - whereCondition := constructWhereCondition(columns...) - - return fmt.Sprintf("SELECT * FROM %s WHERE %s", table, whereCondition) -} - // sqlQueryFieldsByTimeRange returns the SQL statement for selecting fields from the table within the time range func sqlQueryFieldsByTimeRange(table string, fields []string, timeRangeCol string) string { queryFieldStr := strings.Join(fields, ", ") From a0b1dbc10a57c5fcef64e7aba4fc1dcfa9fd0c43 Mon Sep 17 00:00:00 2001 From: Lindsey Cheng Date: Fri, 16 Aug 2024 12:21:49 +0800 Subject: [PATCH 4/4] fix: Use type switch to get the reading type for inserting reading Use type switch to convert the reading interface type to reading structs. Signed-off-by: Lindsey Cheng --- .../pkg/infrastructure/postgres/reading.go | 46 ++++++------------- 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/internal/pkg/infrastructure/postgres/reading.go b/internal/pkg/infrastructure/postgres/reading.go index fc55aa4b5a..138efac5b2 100644 --- a/internal/pkg/infrastructure/postgres/reading.go +++ b/internal/pkg/infrastructure/postgres/reading.go @@ -14,7 +14,6 @@ import ( pgClient "github.com/edgexfoundry/edgex-go/internal/pkg/db/postgres" dbModels "github.com/edgexfoundry/edgex-go/internal/pkg/infrastructure/postgres/models" - "github.com/edgexfoundry/go-mod-core-contracts/v3/common" "github.com/edgexfoundry/go-mod-core-contracts/v3/errors" model "github.com/edgexfoundry/go-mod-core-contracts/v3/models" @@ -237,9 +236,7 @@ func queryReadings(ctx context.Context, connPool *pgxpool.Pool, sql string, args // convert the BaseReading fields to BaseReading struct defined in contract baseReading := readingDBModel.GetBaseReading() - valueType := baseReading.ValueType - - if valueType == common.ValueTypeBinary { + if readingDBModel.BinaryValue != nil { // reading type is BinaryReading binaryReading := model.BinaryReading{ BaseReading: baseReading, @@ -247,20 +244,22 @@ func queryReadings(ctx context.Context, connPool *pgxpool.Pool, sql string, args BinaryValue: readingDBModel.BinaryValue, } reading = binaryReading - } else if baseReading.ValueType == common.ValueTypeObject { + } else if readingDBModel.ObjectValue != nil { // reading type is ObjectReading objReading := model.ObjectReading{ BaseReading: baseReading, ObjectValue: readingDBModel.ObjectValue, } reading = objReading - } else { + } else if readingDBModel.Value != nil { // reading type is SimpleReading simpleReading := model.SimpleReading{ BaseReading: baseReading, Value: *readingDBModel.Value, } reading = simpleReading + } else { + return reading, errors.NewCommonEdgeX(errors.KindServerError, "failed to convert reading to none of BinaryReading/ObjectReading/SimpleReading structs", nil) } return reading, nil @@ -318,48 +317,33 @@ func addReadingsInTx(tx pgx.Tx, readings []model.Reading, eventId string) error for _, r := range readings { baseReading := r.GetBaseReading() var readingDBModel dbModels.Reading - valueType := baseReading.ValueType - if valueType == common.ValueTypeBinary { - // convert reading to BinaryReading struct - b, ok := r.(model.BinaryReading) - if !ok { - return errors.NewCommonEdgeX(errors.KindServerError, "failed to convert reading to BinaryReading model", nil) - } + switch contractReadingModel := r.(type) { + case model.BinaryReading: // convert BinaryReading struct to Reading DB model readingDBModel = dbModels.Reading{ BaseReading: baseReading, BinaryReading: dbModels.BinaryReading{ - BinaryValue: b.BinaryValue, - MediaType: &b.MediaType, + BinaryValue: contractReadingModel.BinaryValue, + MediaType: &contractReadingModel.MediaType, }, } - } else if valueType == common.ValueTypeObject { - // convert reading to ObjectReading struct - o, ok := r.(model.ObjectReading) - if !ok { - return errors.NewCommonEdgeX(errors.KindServerError, "failed to convert reading to ObjectReading model", nil) - } - + case model.ObjectReading: // convert ObjectReading struct to Reading DB model readingDBModel = dbModels.Reading{ BaseReading: baseReading, ObjectReading: dbModels.ObjectReading{ - ObjectValue: o.ObjectValue, + ObjectValue: contractReadingModel.ObjectValue, }, } - } else { - // convert reading to SimpleReading struct - s, ok := r.(model.SimpleReading) - if !ok { - return errors.NewCommonEdgeX(errors.KindServerError, "failed to convert reading to SimpleReading model", nil) - } - + case model.SimpleReading: // convert SimpleReading struct to Reading DB model readingDBModel = dbModels.Reading{ BaseReading: baseReading, - SimpleReading: dbModels.SimpleReading{Value: &s.Value}, + SimpleReading: dbModels.SimpleReading{Value: &contractReadingModel.Value}, } + default: + return errors.NewCommonEdgeX(errors.KindContractInvalid, "failed to convert reading to none of BinaryReading/ObjectReading/SimpleReading structs", nil) } readingDBModels = append(readingDBModels, readingDBModel) }