Skip to content

Commit

Permalink
executor: handle OOM panic which not be recovered now in distSQL layer
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored May 20, 2019
1 parent b77b534 commit 7859443
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
6 changes: 6 additions & 0 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ func (r *selectResult) Fetch(ctx context.Context) {
func (r *selectResult) fetch(ctx context.Context) {
startTime := time.Now()
defer func() {
if c := recover(); c != nil {
err := fmt.Errorf("%v", c)
logutil.Logger(ctx).Error("OOM", zap.Error(err))
r.results <- resultWithErr{err: err}
}

close(r.results)
duration := time.Since(startTime)
metrics.DistSQLQueryHistgram.WithLabelValues(r.label, r.sqlType).Observe(duration.Seconds())
Expand Down
11 changes: 6 additions & 5 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,8 @@ func (sender *copIteratorTaskSender) sendToTaskCh(t *copTask) (exit bool) {
return
}

func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse) (exit bool) {
if worker.memTracker != nil {
func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse, checkOOM bool) (exit bool) {
if worker.memTracker != nil && checkOOM {
worker.memTracker.Consume(int64(resp.MemSize()))
}
select {
Expand Down Expand Up @@ -617,15 +617,16 @@ func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh
zap.Reflect("r", r),
zap.Stack("stack trace"))
resp := &copResponse{err: errors.Errorf("%v", r)}
worker.sendToRespCh(resp, task.respChan)
// if panic has happened, set checkOOM to false to avoid another panic.
worker.sendToRespCh(resp, task.respChan, false)
}
}()
remainTasks := []*copTask{task}
for len(remainTasks) > 0 {
tasks, err := worker.handleTaskOnce(bo, remainTasks[0], respCh)
if err != nil {
resp := &copResponse{err: errors.Trace(err)}
worker.sendToRespCh(resp, respCh)
worker.sendToRespCh(resp, respCh, true)
return
}
if len(tasks) > 0 {
Expand Down Expand Up @@ -820,7 +821,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
}
}
}
worker.sendToRespCh(resp, ch)
worker.sendToRespCh(resp, ch, true)
return nil, nil
}

Expand Down

0 comments on commit 7859443

Please sign in to comment.