diff --git a/pkg/kine/server/watch.go b/pkg/kine/server/watch.go index 761fe653..e7074b70 100644 --- a/pkg/kine/server/watch.go +++ b/pkg/kine/server/watch.go @@ -41,7 +41,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { } if cr := msg.GetCancelRequest(); cr != nil { logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId) - w.Cancel(cr.WatchId, nil, ws.Context()) + w.Cancel(cr.WatchId, nil) } if pr := msg.GetProgressRequest(); pr != nil { w.Progress(ws.Context()) @@ -51,11 +51,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { // pollProgressNotify periodically sends progress notifications to all watchers. func (w *watcher) pollProgressNotify(ctx context.Context, interval time.Duration) { - ch := make(chan struct{}, 1) - go func() { - defer close(ch) - tick := time.NewTicker(interval) defer tick.Stop() @@ -64,15 +60,8 @@ func (w *watcher) pollProgressNotify(ctx context.Context, interval time.Duration case <-ctx.Done(): return case <-tick.C: - // Skip this tick if ProgressIfSynced is still running. - select { - case ch <- struct{}{}: - if err := w.ProgressIfSynced(ctx); err != nil { - logrus.Errorf("Failed to send progress notification: %v", err) - } - <-ch - default: - logrus.Warn("Skipping progress notification: still busy.") + if err := w.ProgressIfSynced(ctx); err != nil { + logrus.Errorf("Failed to send progress notification: %v", err) } } } @@ -122,14 +111,14 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) Created: true, WatchId: id, }); err != nil { - w.Cancel(id, err, ctx) + w.Cancel(id, err) return } watchCh, err := w.backend.Watch(ctx, key, startRevision) if err != nil { logrus.Errorf("Failed to start watch: %v", err) - w.Cancel(id, err, ctx) + w.Cancel(id, err) return } @@ -184,7 +173,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) } logrus.Tracef("WATCH SEND id=%d, key=%s, revision=%d, events=%d, size=%d, reads=%d", id, key, revision, len(wr.Events), wr.Size(), reads) if err := w.server.Send(wr); err != nil { - w.Cancel(id, err, ctx) + w.Cancel(id, err) } } } @@ -216,7 +205,7 @@ func toEvent(event *Event) *mvccpb.Event { return e } -func (w *watcher) Cancel(watchID int64, err error, ctx context.Context) { +func (w *watcher) Cancel(watchID int64, err error) { w.Lock() if progressCh, ok := w.progress[watchID]; ok { close(progressCh) @@ -236,10 +225,9 @@ func (w *watcher) Cancel(watchID int64, err error, ctx context.Context) { if err == ErrCompacted { // the requested start revision is compacted. Pass the current and and compact // revision to the client via the cancel response, along with the correct error message. - compactRev, revision, err = w.backend.GetCompactRevision(ctx) + compactRev, revision, err = w.backend.GetCompactRevision(w.server.Context()) if err != nil { logrus.Errorf("Failed to get compact and current revision for cancel response: %v", err) - compactRev = 0 } } } diff --git a/pkg/kine/sqllog/sqllog.go b/pkg/kine/sqllog/sqllog.go index 6535a1b3..b4efa9ce 100644 --- a/pkg/kine/sqllog/sqllog.go +++ b/pkg/kine/sqllog/sqllog.go @@ -193,7 +193,7 @@ func (s *SQLLog) DoCompact(ctx context.Context) (err error) { // small batches. Given that this logic runs every second, // on regime it should take usually just a couple batches // to keep the pace. - start, target, err := s.GetCompactRevision(ctx) + start, target, err := s.d.GetCompactRevision(ctx) if err != nil { return err } @@ -238,7 +238,7 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64 attribute.Int64("limit", limit), ) - compactRevision, currentRevision, err := s.GetCompactRevision(ctx) + compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx) if err != nil { return 0, nil, err } @@ -275,7 +275,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis attribute.Int64("revision", revision), ) - compactRevision, currentRevision, err := s.GetCompactRevision(ctx) + compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx) if err != nil { return 0, nil, err } @@ -445,7 +445,7 @@ func (s *SQLLog) startWatch(ctx context.Context) (chan []*server.Event, error) { return nil, err } - pollStart, _, err := s.GetCompactRevision(ctx) + pollStart, _, err := s.d.GetCompactRevision(ctx) if err != nil { return nil, err } @@ -603,7 +603,7 @@ func (s *SQLLog) Count(ctx context.Context, prefix, startKey string, revision in attribute.Int64("revision", revision), ) - compactRevision, currentRevision, err := s.GetCompactRevision(ctx) + compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx) if err != nil { return 0, 0, err }