Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve queries after LogStructured removal #195

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 55 additions & 32 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@ func init() {
}

var (
columns = "kv.id as theid, kv.name, kv.created, kv.deleted, kv.create_revision, kv.prev_revision, kv.lease, kv.value, kv.old_value"

revSQL = `
SELECT MAX(rkv.id) AS id
FROM kine AS rkv`

listSQL = fmt.Sprintf(`
SELECT %s
listSQL = `
SELECT kv.id,
name,
CASE WHEN kv.created THEN kv.id ELSE kv.create_revision END AS create_revision,
lease,
value
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) as id
Expand All @@ -97,9 +99,8 @@ var (
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE (kv.deleted = 0 OR ?)
ORDER BY kv.name ASC, kv.id ASC
`, columns)
WHERE kv.deleted = 0
ORDER BY kv.name ASC, kv.id ASC`

revisionIntervalSQL = `
SELECT (
Expand All @@ -111,28 +112,46 @@ var (
FROM kine
) AS high`

listRevisionStartSQL = listSQL

countRevisionSQL = fmt.Sprintf(`
countRevisionSQL = `
SELECT COUNT(*)
FROM (
%s
)`, listSQL)

afterSQLPrefix = fmt.Sprintf(`
SELECT %s
FROM kine AS kv
WHERE
kv.name >= ? AND kv.name < ?
AND kv.id > ?
ORDER BY kv.id ASC`, columns)

afterSQL = fmt.Sprintf(`
SELECT %s
FROM kine AS kv
WHERE kv.id > ?
ORDER BY kv.id ASC
`, columns)
JOIN (
SELECT MAX(mkv.id) as id
FROM kine AS mkv
WHERE
mkv.name >= ? AND mkv.name < ?
AND mkv.id <= ?
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE kv.deleted = 0`

afterSQLPrefix = `
SELECT id, name, created, deleted, create_revision, prev_revision, lease, value, old_value
FROM kine
WHERE name >= ? AND name < ?
AND id > ?
ORDER BY id ASC`

afterSQL = `
SELECT id, name, created, deleted, create_revision, prev_revision, lease, value, old_value
FROM kine
WHERE id > ?
ORDER BY id ASC`

ttlSQL = `
SELECT kv.id,
name,
lease
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) as id
FROM kine AS mkv
WHERE mkv.id <= ?
GROUP BY mkv.name
) AS maxkv
ON maxkv.id = kv.id
WHERE kv.deleted = 0 AND kv.lease != 0`

deleteRevSQL = `
DELETE FROM kine
Expand Down Expand Up @@ -408,7 +427,7 @@ func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision i
if startKey != "" {
start = startKey + "\x01"
}
rows, err := d.query(ctx, "count_revision", countRevisionSQL, start, end, revision, false)
rows, err := d.query(ctx, "count_revision", countRevisionSQL, start, end, revision)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -651,17 +670,21 @@ func (d *Generic) DeleteRevision(ctx context.Context, revision int64) error {
return err
}

func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) {
func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revision int64) (*sql.Rows, error) {
start, end := getPrefixRange(prefix)
if startKey != "" {
start = startKey + "\x01"
}
sql := listRevisionStartSQL
sql := listSQL
if limit > 0 {
sql = fmt.Sprintf("%s LIMIT ?", sql)
return d.query(ctx, "list_revision_start_sql_limit", sql, start, end, revision, includeDeleted, limit)
return d.query(ctx, "list_revision_start_sql_limit", sql, start, end, revision, limit)
}
return d.query(ctx, "list_revision_start_sql", sql, start, end, revision, includeDeleted)
return d.query(ctx, "list_revision_start_sql", sql, start, end, revision)
}

func (d *Generic) ListTTL(ctx context.Context, revision int64) (*sql.Rows, error) {
return d.query(ctx, "ttl_sql", ttlSQL, revision)
}

func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
Expand Down
50 changes: 23 additions & 27 deletions pkg/kine/sqllog/sqllog.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func init() {
}

type Dialect interface {
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64) (*sql.Rows, error)
ListTTL(ctx context.Context, revision int64) (*sql.Rows, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this as a separate function on the dialect, it not only makes the queries more readable but also reduces the amount of things we return.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also reflects nicely in the benchmark :) Thanks!

Count(ctx context.Context, prefix, startKey string, revision int64) (int64, error)
CurrentRevision(ctx context.Context) (int64, error)
AfterPrefix(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error)
Expand Down Expand Up @@ -286,7 +287,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis
startKey = ""
}

rows, err := s.d.List(ctx, prefix, startKey, limit, revision, false)
rows, err := s.d.List(ctx, prefix, startKey, limit, revision)
if err != nil {
return 0, nil, err
}
Expand All @@ -313,23 +314,31 @@ func (s *SQLLog) ttl(ctx context.Context) {
go func() {
defer s.wg.Done()

rev, kvs, err := s.List(ctx, "/", "", 1000, 0)
for len(kvs) > 0 {
if err != nil {
startRevision, err := s.d.CurrentRevision(ctx)
if err != nil {
logrus.Errorf("failed to read old events for ttl: %v", err)
return
}

rows, err := s.d.ListTTL(ctx, startRevision)
if err != nil {
logrus.Errorf("failed to read old events for ttl: %v", err)
return
}

var (
key string
revision, lease int64
)
for rows.Next() {
if err := rows.Scan(&revision, &key, &lease); err != nil {
logrus.Errorf("failed to read old events for ttl: %v", err)
return
}

for _, kv := range kvs {
if kv.Lease > 0 {
go run(ctx, kv.Key, kv.ModRevision, time.Duration(kv.Lease)*time.Second)
}
}

_, kvs, err = s.List(ctx, "/", kvs[len(kvs)-1].Key, 1000, rev)
go run(ctx, key, revision, time.Duration(lease)*time.Second)
}

watchCh, err := s.Watch(ctx, "/", rev)
watchCh, err := s.Watch(ctx, "/", startRevision)
if err != nil {
logrus.Errorf("failed to watch events for ttl: %v", err)
return
Expand Down Expand Up @@ -654,29 +663,16 @@ func ScanAll[T any](rows *sql.Rows, scanOne func(*sql.Rows) (T, error)) ([]T, er

func scanKeyValue(rows *sql.Rows) (*server.KeyValue, error) {
kv := &server.KeyValue{}
var create, delete bool
var prevRevision int64
var prevValue []byte

err := rows.Scan(
&kv.ModRevision,
&kv.Key,
&create,
&delete,
&kv.CreateRevision,
&prevRevision,
&kv.Lease,
&kv.Value,
&prevValue,
)
if err != nil {
return nil, err
}

if create {
kv.CreateRevision = kv.ModRevision
}

return kv, nil
}

Expand Down
Loading