From c0f5db48ed2234596491552b7e8494a3eb558bc8 Mon Sep 17 00:00:00 2001 From: Marco Manino Date: Tue, 29 Oct 2024 16:19:57 +0100 Subject: [PATCH 1/2] Improve list query --- pkg/kine/drivers/generic/generic.go | 41 ++++++++++++++++++----------- pkg/kine/sqllog/sqllog.go | 17 ++---------- 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/pkg/kine/drivers/generic/generic.go b/pkg/kine/drivers/generic/generic.go index e75323f6..cd7ce0c6 100644 --- a/pkg/kine/drivers/generic/generic.go +++ b/pkg/kine/drivers/generic/generic.go @@ -85,8 +85,12 @@ var ( SELECT MAX(rkv.id) AS id FROM kine AS rkv` - listSQL = fmt.Sprintf(` - SELECT %s + listSQL = ` + SELECT kv.id, + name, + CASE WHEN kv.created THEN kv.id ELSE kv.create_revision END AS create_revision, + lease, + value FROM kine AS kv JOIN ( SELECT MAX(mkv.id) as id @@ -97,9 +101,8 @@ var ( GROUP BY mkv.name ) AS maxkv ON maxkv.id = kv.id - WHERE (kv.deleted = 0 OR ?) - ORDER BY kv.name ASC, kv.id ASC - `, columns) + WHERE kv.deleted = 0 + ORDER BY kv.name ASC, kv.id ASC` revisionIntervalSQL = ` SELECT ( @@ -111,13 +114,19 @@ var ( FROM kine ) AS high` - listRevisionStartSQL = listSQL - - countRevisionSQL = fmt.Sprintf(` + countRevisionSQL = ` SELECT COUNT(*) - FROM ( - %s - )`, listSQL) + FROM kine AS kv + JOIN ( + SELECT MAX(mkv.id) as id + FROM kine AS mkv + WHERE + mkv.name >= ? AND mkv.name < ? + AND mkv.id <= ? + GROUP BY mkv.name + ) AS maxkv + ON maxkv.id = kv.id + WHERE kv.deleted = 0` afterSQLPrefix = fmt.Sprintf(` SELECT %s @@ -408,7 +417,7 @@ func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision i if startKey != "" { start = startKey + "\x01" } - rows, err := d.query(ctx, "count_revision", countRevisionSQL, start, end, revision, false) + rows, err := d.query(ctx, "count_revision", countRevisionSQL, start, end, revision) if err != nil { return 0, err } @@ -651,17 +660,17 @@ func (d *Generic) DeleteRevision(ctx context.Context, revision int64) error { return err } -func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) { +func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revision int64) (*sql.Rows, error) { start, end := getPrefixRange(prefix) if startKey != "" { start = startKey + "\x01" } - sql := listRevisionStartSQL + sql := listSQL if limit > 0 { sql = fmt.Sprintf("%s LIMIT ?", sql) - return d.query(ctx, "list_revision_start_sql_limit", sql, start, end, revision, includeDeleted, limit) + return d.query(ctx, "list_revision_start_sql_limit", sql, start, end, revision, limit) } - return d.query(ctx, "list_revision_start_sql", sql, start, end, revision, includeDeleted) + return d.query(ctx, "list_revision_start_sql", sql, start, end, revision) } func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) { diff --git a/pkg/kine/sqllog/sqllog.go b/pkg/kine/sqllog/sqllog.go index 83468230..94ecb002 100644 --- a/pkg/kine/sqllog/sqllog.go +++ b/pkg/kine/sqllog/sqllog.go @@ -42,7 +42,7 @@ func init() { } type Dialect interface { - List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) + List(ctx context.Context, prefix, startKey string, limit, revision int64) (*sql.Rows, error) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, error) CurrentRevision(ctx context.Context) (int64, error) AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) @@ -286,7 +286,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis startKey = "" } - rows, err := s.d.List(ctx, prefix, startKey, limit, revision, false) + rows, err := s.d.List(ctx, prefix, startKey, limit, revision) if err != nil { return 0, nil, err } @@ -654,29 +654,16 @@ func ScanAll[T any](rows *sql.Rows, scanOne func(*sql.Rows) (T, error)) ([]T, er func scanKeyValue(rows *sql.Rows) (*server.KeyValue, error) { kv := &server.KeyValue{} - var create, delete bool - var prevRevision int64 - var prevValue []byte - err := rows.Scan( &kv.ModRevision, &kv.Key, - &create, - &delete, &kv.CreateRevision, - &prevRevision, &kv.Lease, &kv.Value, - &prevValue, ) if err != nil { return nil, err } - - if create { - kv.CreateRevision = kv.ModRevision - } - return kv, nil } From 95524968849d99176ace092c38fba0106cfa4322 Mon Sep 17 00:00:00 2001 From: Marco Manino Date: Tue, 29 Oct 2024 16:37:43 +0100 Subject: [PATCH 2/2] Add TTL query to speed up startup --- pkg/kine/drivers/generic/generic.go | 44 +++++++++++++++++++---------- pkg/kine/sqllog/sqllog.go | 33 ++++++++++++++-------- 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/pkg/kine/drivers/generic/generic.go b/pkg/kine/drivers/generic/generic.go index cd7ce0c6..27dfe581 100644 --- a/pkg/kine/drivers/generic/generic.go +++ b/pkg/kine/drivers/generic/generic.go @@ -79,8 +79,6 @@ func init() { } var ( - columns = "kv.id as theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value" - revSQL = ` SELECT MAX(rkv.id) AS id FROM kine AS rkv` @@ -128,20 +126,32 @@ var ( ON maxkv.id = kv.id WHERE kv.deleted = 0` - afterSQLPrefix = fmt.Sprintf(` - SELECT %s + afterSQLPrefix = ` + SELECT id, name, created, deleted, create_revision, prev_revision, lease, value, old_value + FROM kine + WHERE name >= ? AND name < ? + AND id > ? + ORDER BY id ASC` + + afterSQL = ` + SELECT id, name, created, deleted, create_revision, prev_revision, lease, value, old_value + FROM kine + WHERE id > ? + ORDER BY id ASC` + + ttlSQL = ` + SELECT kv.id, + name, + lease FROM kine AS kv - WHERE - kv.name >= ? AND kv.name < ? - AND kv.id > ? - ORDER BY kv.id ASC`, columns) - - afterSQL = fmt.Sprintf(` - SELECT %s - FROM kine AS kv - WHERE kv.id > ? - ORDER BY kv.id ASC - `, columns) + JOIN ( + SELECT MAX(mkv.id) as id + FROM kine AS mkv + WHERE mkv.id <= ? + GROUP BY mkv.name + ) AS maxkv + ON maxkv.id = kv.id + WHERE kv.deleted = 0 AND kv.lease != 0` deleteRevSQL = ` DELETE FROM kine @@ -673,6 +683,10 @@ func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revi return d.query(ctx, "list_revision_start_sql", sql, start, end, revision) } +func (d *Generic) ListTTL(ctx context.Context, revision int64) (*sql.Rows, error) { + return d.query(ctx, "ttl_sql", ttlSQL, revision) +} + func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) { var id int64 var err error diff --git a/pkg/kine/sqllog/sqllog.go b/pkg/kine/sqllog/sqllog.go index 94ecb002..9cb9b8b7 100644 --- a/pkg/kine/sqllog/sqllog.go +++ b/pkg/kine/sqllog/sqllog.go @@ -43,6 +43,7 @@ func init() { type Dialect interface { List(ctx context.Context, prefix, startKey string, limit, revision int64) (*sql.Rows, error) + ListTTL(ctx context.Context, revision int64) (*sql.Rows, error) Count(ctx context.Context, prefix, startKey string, revision int64) (int64, error) CurrentRevision(ctx context.Context) (int64, error) AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) @@ -313,23 +314,31 @@ func (s *SQLLog) ttl(ctx context.Context) { go func() { defer s.wg.Done() - rev, kvs, err := s.List(ctx, "/", "", 1000, 0) - for len(kvs) > 0 { - if err != nil { + startRevision, err := s.d.CurrentRevision(ctx) + if err != nil { + logrus.Errorf("failed to read old events for ttl: %v", err) + return + } + + rows, err := s.d.ListTTL(ctx, startRevision) + if err != nil { + logrus.Errorf("failed to read old events for ttl: %v", err) + return + } + + var ( + key string + revision, lease int64 + ) + for rows.Next() { + if err := rows.Scan(&revision, &key, &lease); err != nil { logrus.Errorf("failed to read old events for ttl: %v", err) return } - - for _, kv := range kvs { - if kv.Lease > 0 { - go run(ctx, kv.Key, kv.ModRevision, time.Duration(kv.Lease)*time.Second) - } - } - - _, kvs, err = s.List(ctx, "/", kvs[len(kvs)-1].Key, 1000, rev) + go run(ctx, key, revision, time.Duration(lease)*time.Second) } - watchCh, err := s.Watch(ctx, "/", rev) + watchCh, err := s.Watch(ctx, "/", startRevision) if err != nil { logrus.Errorf("failed to watch events for ttl: %v", err) return