Skip to content

Commit 7f1d492

Browse files
authored
feat: Enhance LatestScheduleActionRecordsByJobName API (edgexfoundry#4871)
* feat: Enhance LatestScheduleActionRecordsByJobName API This is the seventh PR of edgexfoundry#4834 - Added ID to ScheduleAction to enhance the LatestScheduleActionRecordsByJobName API - Modified the schedule_action_record table Signed-off-by: Jack Chen <[email protected]> * feat: Update Doc for Cron Scheduler Signed-off-by: Jack Chen <[email protected]> * fix: Address Review Comment Signed-off-by: Jack Chen <[email protected]> --------- Signed-off-by: Jack Chen <[email protected]>
1 parent dcba1ae commit 7f1d492

File tree

17 files changed

+485
-474
lines changed

17 files changed

+485
-474
lines changed

cmd/support-cron-scheduler/res/db/sql/01-tables.sql

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ CREATE TABLE IF NOT EXISTS scheduler.schedule_job (
1313
);
1414

1515
-- scheduler.schedule_action_record is used to store the schedule action record
16-
-- Note: All the records belong to the same job should have the same created time.
1716
CREATE TABLE IF NOT EXISTS scheduler.schedule_action_record (
1817
id UUID PRIMARY KEY,
18+
action_id UUID NOT NULL,
1919
job_name TEXT NOT NULL,
2020
action JSONB NOT NULL,
2121
status TEXT NOT NULL,
2222
scheduled_at timestamp NOT NULL,
23-
created timestamp NOT NULL
23+
created timestamp NOT NULL DEFAULT now()
2424
);

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/eclipse/paho.mqtt.golang v1.5.0
77
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.50
88
github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.12
9-
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.38
9+
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.39
1010
github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.31
1111
github.com/edgexfoundry/go-mod-secrets/v3 v3.2.0-dev.9
1212
github.com/fxamacker/cbor/v2 v2.7.0

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.50 h1:i6VyieS5P7olGlhG1Wt
8888
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.2.0-dev.50/go.mod h1:oOuvWXdu6YaB2J17pe4X0ey66AZFyTzOmAZDQxPGGmM=
8989
github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.12 h1:JGZ9fsyCZOgbNkg+qdW9JN63NKIEX95v5zJhCVdlp10=
9090
github.com/edgexfoundry/go-mod-configuration/v3 v3.2.0-dev.12/go.mod h1:v7CvWGVmTh8dKItDNtfdBnYTeLhfZP5YmFiLsGJL9KU=
91-
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.38 h1:tLhD+x0wvvMr11KFof7E95pKMBuntxCYjwC6XEhCJxg=
92-
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.38/go.mod h1:d/FCa9Djq/pb7RYGEEhrR7fnKo+JK5IQ2YGW4LIHAqE=
91+
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.39 h1:z6E1KOAzHyFR/87A+0CxYlui0kq6tz7yctS6dXcowk0=
92+
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.2.0-dev.39/go.mod h1:d/FCa9Djq/pb7RYGEEhrR7fnKo+JK5IQ2YGW4LIHAqE=
9393
github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.31 h1:mC0ZguoK8HjVxeD7dIiXRqKswM0y7gnPQJt1fLOh/v4=
9494
github.com/edgexfoundry/go-mod-messaging/v3 v3.2.0-dev.31/go.mod h1:gcHtufkjd6oa3ZLqfzp66bCyCPx8MZe8Pwzh+2ITFnw=
9595
github.com/edgexfoundry/go-mod-registry/v3 v3.2.0-dev.13 h1:LkaF2eOpSz4eUiGpah4a9r+cB/A0Pea3Nh7aTU9hlKs=

internal/pkg/db/postgres/utils.go

+3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"sort"
1616

1717
"github.com/jackc/pgerrcode"
18+
"github.com/jackc/pgx/v5"
1819
"github.com/jackc/pgx/v5/pgconn"
1920
"github.com/jackc/pgx/v5/pgxpool"
2021

@@ -139,6 +140,8 @@ func WrapDBError(message string, err error) errors.EdgeX {
139140
return errors.NewCommonEdgeX(errors.KindDuplicateName, errMsg, nil)
140141
}
141142
return errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("%s: %s %s", message, pgErr.Error(), pgErr.Detail), nil)
143+
} else if goErrors.Is(err, pgx.ErrNoRows) {
144+
return errors.NewCommonEdgeX(errors.KindEntityDoesNotExist, message, err)
142145
}
143146
return errors.NewCommonEdgeX(errors.KindDatabaseError, message, err)
144147
}

internal/pkg/infrastructure/postgres/scheduleactionrecord.go

+32-45
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,34 @@ import (
2222

2323
const (
2424
scheduleActionRecordTable = "scheduler.schedule_action_record"
25+
actionIdCol = "action_id"
2526
jobNameCol = "job_name"
2627
actionCol = "action"
2728
scheduledAtCol = "scheduled_at"
2829
)
2930

3031
// AddScheduleActionRecord adds a new schedule action record to the database
31-
// Note: the created field should be set manually before calling this function, and all the records belong to the same job should have the same created time.
32-
// So that the created time can be used to query the latest schedule action records of a job.
32+
// Note: the scheduledAt field should be set manually before calling this function.
3333
func (c *Client) AddScheduleActionRecord(ctx context.Context, scheduleActionRecord model.ScheduleActionRecord) (model.ScheduleActionRecord, errors.EdgeX) {
3434
if len(scheduleActionRecord.Id) == 0 {
3535
scheduleActionRecord.Id = uuid.New().String()
3636
}
3737
return addScheduleActionRecord(ctx, c.ConnPool, scheduleActionRecord)
3838
}
3939

40+
// AddScheduleActionRecords adds multiple schedule action records to the database
41+
func (c *Client) AddScheduleActionRecords(ctx context.Context, scheduleActionRecords []model.ScheduleActionRecord) ([]model.ScheduleActionRecord, errors.EdgeX) {
42+
records := make([]model.ScheduleActionRecord, len(scheduleActionRecords))
43+
for _, record := range scheduleActionRecords {
44+
r, err := c.AddScheduleActionRecord(ctx, record)
45+
if err != nil {
46+
return nil, errors.NewCommonEdgeXWrapper(err)
47+
}
48+
records = append(records, r)
49+
}
50+
return records, nil
51+
}
52+
4053
// AllScheduleActionRecords queries the schedule action records with the given range, offset, and limit
4154
func (c *Client) AllScheduleActionRecords(ctx context.Context, start, end int64, offset, limit int) ([]model.ScheduleActionRecord, errors.EdgeX) {
4255
var err errors.EdgeX
@@ -47,42 +60,32 @@ func (c *Client) AllScheduleActionRecords(ctx context.Context, start, end int64,
4760

4861
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllWithPaginationAndTimeRange(scheduleActionRecordTable), time.UnixMilli(start), time.UnixMilli(end), offset, limit)
4962
if err != nil {
50-
return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, "failed to query all schedule action records", err)
63+
return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all schedule action records", err)
5164
}
5265

5366
return records, nil
5467
}
5568

56-
// LatestScheduleActionRecords queries the latest schedule action records of all schedule jobs with the given offset and limit
57-
func (c *Client) LatestScheduleActionRecords(ctx context.Context, offset, limit int) ([]model.ScheduleActionRecord, errors.EdgeX) {
58-
// Get all the job names
59-
jobNames, err := queryScheduleJobNames(ctx, c.ConnPool)
60-
if err != nil {
61-
return nil, errors.NewCommonEdgeXWrapper(err)
62-
}
63-
69+
// LatestScheduleActionRecordsByJobName queries the latest schedule action records by job name
70+
func (c *Client) LatestScheduleActionRecordsByJobName(ctx context.Context, jobName string) ([]model.ScheduleActionRecord, errors.EdgeX) {
6471
sqlQueryLatestScheduleActionRecords := `
65-
SELECT id, job_name, action, status, scheduled_at, created
72+
SELECT id, action_id, job_name, action, status, scheduled_at, created
6673
FROM(
6774
SELECT *
6875
FROM (
6976
SELECT *,
70-
RANK() OVER (PARTITION BY job_name ORDER BY created DESC) AS rnk
77+
RANK() OVER (PARTITION BY job_name, action_id ORDER BY created DESC) AS rnk
7178
FROM scheduler.schedule_action_record
72-
WHERE job_name = ANY($1::text[])
79+
WHERE job_name = $1
7380
) subquery
7481
WHERE rnk = 1
7582
)
76-
ORDER BY job_name, created DESC
77-
OFFSET $2
78-
LIMIT $3;
83+
ORDER BY job_name, created DESC;
7984
`
8085

81-
// Pass the offset and limit here
82-
offset, limit = getValidOffsetAndLimit(offset, limit)
83-
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryLatestScheduleActionRecords, jobNames, offset, limit)
86+
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryLatestScheduleActionRecords, jobName)
8487
if err != nil {
85-
return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, "failed to query latest schedule action records", err)
88+
return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query latest schedule action records", err)
8689
}
8790

8891
return records, nil
@@ -98,7 +101,7 @@ func (c *Client) ScheduleActionRecordsByStatus(ctx context.Context, status strin
98101

99102
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllByStatusWithPaginationAndTimeRange(scheduleActionRecordTable), status, time.UnixMilli(start), time.UnixMilli(end), offset, limit)
100103
if err != nil {
101-
return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to query schedule action records by status %s", status), err)
104+
return nil, errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query schedule action records by status %s", status), err)
102105
}
103106

104107
return records, nil
@@ -114,7 +117,7 @@ func (c *Client) ScheduleActionRecordsByJobName(ctx context.Context, jobName str
114117

115118
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllByColWithPaginationAndTimeRange(scheduleActionRecordTable, jobNameCol), jobName, time.UnixMilli(start), time.UnixMilli(end), offset, limit)
116119
if err != nil {
117-
return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to query schedule action records by job name %s", jobName), err)
120+
return nil, errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query schedule action records by job name %s", jobName), err)
118121
}
119122

120123
return records, nil
@@ -130,7 +133,7 @@ func (c *Client) ScheduleActionRecordsByJobNameAndStatus(ctx context.Context, jo
130133

131134
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllByColWithPaginationAndTimeRange(scheduleActionRecordTable, jobNameCol, statusCol), jobName, status, time.UnixMilli(start), time.UnixMilli(end), offset, limit)
132135
if err != nil {
133-
return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to query schedule action records by job name %s and status %s", jobName, status), err)
136+
return nil, errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query schedule action records by job name %s and status %s", jobName, status), err)
134137
}
135138

136139
return records, nil
@@ -141,23 +144,6 @@ func (c *Client) ScheduleActionRecordTotalCount(ctx context.Context) (uint32, er
141144
return getTotalRowsCount(ctx, c.ConnPool, sqlQueryCount(scheduleActionRecordTable))
142145
}
143146

144-
// LatestScheduleActionRecordTotalCount returns the total count of all the latest schedule action records
145-
func (c *Client) LatestScheduleActionRecordTotalCount(ctx context.Context) (uint32, errors.EdgeX) {
146-
sqlQueryLatestScheduleActionRecordCount := `
147-
SELECT COUNT(*)
148-
FROM (
149-
SELECT *
150-
FROM (
151-
SELECT *,
152-
RANK() OVER (PARTITION BY job_name ORDER BY created DESC) AS rnk
153-
FROM scheduler.schedule_action_record
154-
)
155-
WHERE rnk = 1
156-
)
157-
`
158-
return getTotalRowsCount(ctx, c.ConnPool, sqlQueryLatestScheduleActionRecordCount)
159-
}
160-
161147
// ScheduleActionRecordCountByStatus returns the total count of the schedule action records by status
162148
func (c *Client) ScheduleActionRecordCountByStatus(ctx context.Context, status string) (uint32, errors.EdgeX) {
163149
return getTotalRowsCount(ctx, c.ConnPool, sqlQueryCountByCol(scheduleActionRecordTable, statusCol), status)
@@ -190,13 +176,13 @@ func addScheduleActionRecord(ctx context.Context, connPool *pgxpool.Pool, schedu
190176

191177
_, err = connPool.Exec(
192178
ctx,
193-
sqlInsert(scheduleActionRecordTable, idCol, jobNameCol, actionCol, statusCol, scheduledAtCol, createdCol),
179+
sqlInsert(scheduleActionRecordTable, idCol, actionIdCol, jobNameCol, actionCol, statusCol, scheduledAtCol),
194180
scheduleActionRecord.Id,
181+
copiedScheduleAction.GetBaseScheduleAction().Id,
195182
scheduleActionRecord.JobName,
196183
actionJSONBytes,
197184
scheduleActionRecord.Status,
198-
time.UnixMilli(scheduleActionRecord.ScheduledAt).UTC(),
199-
time.UnixMilli(scheduleActionRecord.Created).UTC())
185+
time.UnixMilli(scheduleActionRecord.ScheduledAt).UTC())
200186
if err != nil {
201187
return scheduleActionRecord, pgClient.WrapDBError("failed to insert schedule action record", err)
202188
}
@@ -213,10 +199,11 @@ func queryScheduleActionRecords(ctx context.Context, connPool *pgxpool.Pool, sql
213199

214200
var scheduleActionRecords []model.ScheduleActionRecord
215201
for rows.Next() {
202+
var actionId string
216203
var record model.ScheduleActionRecord
217204
var created, scheduledAt time.Time
218205
var actionJSONBytes []byte
219-
err := rows.Scan(&record.Id, &record.JobName, &actionJSONBytes, &record.Status, &scheduledAt, &created)
206+
err := rows.Scan(&record.Id, &actionId, &record.JobName, &actionJSONBytes, &record.Status, &scheduledAt, &created)
220207
if err != nil {
221208
return nil, pgClient.WrapDBError("failed to scan schedule action record", err)
222209
}

internal/pkg/infrastructure/postgres/schedulejob.go

+3-27
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func (c *Client) AllScheduleJobs(ctx context.Context, offset, limit int) ([]mode
3535
offset, limit = getValidOffsetAndLimit(offset, limit)
3636
jobs, err := queryScheduleJobs(ctx, c.ConnPool, sqlQueryAllWithPagination(scheduleJobTable), offset, limit)
3737
if err != nil {
38-
return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, "failed to query all schedule jobs", err)
38+
return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to query all schedule jobs", err)
3939
}
4040

4141
return jobs, nil
@@ -71,7 +71,7 @@ func (c *Client) DeleteScheduleJobByName(ctx context.Context, name string) error
7171
func (c *Client) ScheduleJobById(ctx context.Context, id string) (model.ScheduleJob, errors.EdgeX) {
7272
scheduleJob, err := queryScheduleJob(ctx, c.ConnPool, sqlQueryAllById(scheduleJobTable), id)
7373
if err != nil {
74-
return scheduleJob, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to query schedule job by id %s", id), err)
74+
return scheduleJob, errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query schedule job by id %s", id), err)
7575
}
7676

7777
return scheduleJob, nil
@@ -81,7 +81,7 @@ func (c *Client) ScheduleJobById(ctx context.Context, id string) (model.Schedule
8181
func (c *Client) ScheduleJobByName(ctx context.Context, name string) (model.ScheduleJob, errors.EdgeX) {
8282
scheduleJob, err := queryScheduleJob(ctx, c.ConnPool, sqlQueryAllByName(scheduleJobTable), name)
8383
if err != nil {
84-
return scheduleJob, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to query schedule job by name %s", name), err)
84+
return scheduleJob, errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query schedule job by name %s", name), err)
8585
}
8686

8787
return scheduleJob, nil
@@ -200,30 +200,6 @@ func queryScheduleJobs(ctx context.Context, connPool *pgxpool.Pool, sql string,
200200
return scheduleJobs, nil
201201
}
202202

203-
func queryScheduleJobNames(ctx context.Context, connPool *pgxpool.Pool) ([]string, errors.EdgeX) {
204-
sqlQueryAllScheduleJobNames := fmt.Sprintf("SELECT name FROM %s ORDER BY created", scheduleJobTable)
205-
rows, err := connPool.Query(ctx, sqlQueryAllScheduleJobNames)
206-
if err != nil {
207-
return nil, pgClient.WrapDBError("failed to query all schedule jobs' names", err)
208-
}
209-
defer rows.Close()
210-
211-
var names []string
212-
for rows.Next() {
213-
var name string
214-
err := rows.Scan(&name)
215-
if err != nil {
216-
return nil, pgClient.WrapDBError("failed to scan schedule job name", err)
217-
}
218-
names = append(names, name)
219-
}
220-
221-
if readErr := rows.Err(); readErr != nil {
222-
return nil, pgClient.WrapDBError("error occurred while query scheduler.schedule_job table", readErr)
223-
}
224-
return names, nil
225-
}
226-
227203
func toScheduleJobsModel(scheduleJobs model.ScheduleJob, scheduleJobJSONBytes []byte) (model.ScheduleJob, errors.EdgeX) {
228204
var storedJob model.ScheduleJob
229205
if err := json.Unmarshal(scheduleJobJSONBytes, &storedJob); err != nil {

0 commit comments

Comments
 (0)