Skip to content

Commit

Permalink
little nits
Browse files Browse the repository at this point in the history
  • Loading branch information
louiseschmidtgen committed Dec 20, 2024
1 parent f89286f commit bd13cd8
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 25 deletions.
28 changes: 8 additions & 20 deletions pkg/kine/server/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {
}
if cr := msg.GetCancelRequest(); cr != nil {
logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId)
w.Cancel(cr.WatchId, nil, ws.Context())
w.Cancel(cr.WatchId, nil)
}
if pr := msg.GetProgressRequest(); pr != nil {
w.Progress(ws.Context())
Expand All @@ -51,11 +51,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {

// pollProgressNotify periodically sends progress notifications to all watchers.
func (w *watcher) pollProgressNotify(ctx context.Context, interval time.Duration) {
ch := make(chan struct{}, 1)

go func() {
defer close(ch)

tick := time.NewTicker(interval)
defer tick.Stop()

Expand All @@ -64,15 +60,8 @@ func (w *watcher) pollProgressNotify(ctx context.Context, interval time.Duration
case <-ctx.Done():
return
case <-tick.C:
// Skip this tick if ProgressIfSynced is still running.
select {
case ch <- struct{}{}:
if err := w.ProgressIfSynced(ctx); err != nil {
logrus.Errorf("Failed to send progress notification: %v", err)
}
<-ch
default:
logrus.Warn("Skipping progress notification: still busy.")
if err := w.ProgressIfSynced(ctx); err != nil {
logrus.Errorf("Failed to send progress notification: %v", err)
}
}
}
Expand Down Expand Up @@ -122,14 +111,14 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)
Created: true,
WatchId: id,
}); err != nil {
w.Cancel(id, err, ctx)
w.Cancel(id, err)
return
}

watchCh, err := w.backend.Watch(ctx, key, startRevision)
if err != nil {
logrus.Errorf("Failed to start watch: %v", err)
w.Cancel(id, err, ctx)
w.Cancel(id, err)
return
}

Expand Down Expand Up @@ -184,7 +173,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)
}
logrus.Tracef("WATCH SEND id=%d, key=%s, revision=%d, events=%d, size=%d, reads=%d", id, key, revision, len(wr.Events), wr.Size(), reads)
if err := w.server.Send(wr); err != nil {
w.Cancel(id, err, ctx)
w.Cancel(id, err)
}
}
}
Expand Down Expand Up @@ -216,7 +205,7 @@ func toEvent(event *Event) *mvccpb.Event {
return e
}

func (w *watcher) Cancel(watchID int64, err error, ctx context.Context) {
func (w *watcher) Cancel(watchID int64, err error) {
w.Lock()
if progressCh, ok := w.progress[watchID]; ok {
close(progressCh)
Expand All @@ -236,10 +225,9 @@ func (w *watcher) Cancel(watchID int64, err error, ctx context.Context) {
if err == ErrCompacted {
// the requested start revision is compacted. Pass the current and and compact
// revision to the client via the cancel response, along with the correct error message.
compactRev, revision, err = w.backend.GetCompactRevision(ctx)
compactRev, revision, err = w.backend.GetCompactRevision(w.server.Context())
if err != nil {
logrus.Errorf("Failed to get compact and current revision for cancel response: %v", err)
compactRev = 0
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/kine/sqllog/sqllog.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *SQLLog) DoCompact(ctx context.Context) (err error) {
// small batches. Given that this logic runs every second,
// on regime it should take usually just a couple batches
// to keep the pace.
start, target, err := s.GetCompactRevision(ctx)
start, target, err := s.d.GetCompactRevision(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -238,7 +238,7 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64
attribute.Int64("limit", limit),
)

compactRevision, currentRevision, err := s.GetCompactRevision(ctx)
compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx)
if err != nil {
return 0, nil, err
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis
attribute.Int64("revision", revision),
)

compactRevision, currentRevision, err := s.GetCompactRevision(ctx)
compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx)
if err != nil {
return 0, nil, err
}
Expand Down Expand Up @@ -445,7 +445,7 @@ func (s *SQLLog) startWatch(ctx context.Context) (chan []*server.Event, error) {
return nil, err
}

pollStart, _, err := s.GetCompactRevision(ctx)
pollStart, _, err := s.d.GetCompactRevision(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -603,7 +603,7 @@ func (s *SQLLog) Count(ctx context.Context, prefix, startKey string, revision in
attribute.Int64("revision", revision),
)

compactRevision, currentRevision, err := s.GetCompactRevision(ctx)
compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx)
if err != nil {
return 0, 0, err
}
Expand Down

0 comments on commit bd13cd8

Please sign in to comment.