Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking function timeouts and cancellation not working #665

Closed
zeiler opened this issue Nov 9, 2024 · 25 comments
Closed

Blocking function timeouts and cancellation not working #665

zeiler opened this issue Nov 9, 2024 · 25 comments

Comments

@zeiler
Copy link
Contributor

zeiler commented Nov 9, 2024

Hello,

We've noticed that blocking functions like xreadgroup or BLPOP have a couple of issues.

If the ConnWriteTimeout setting is less than the timeout you want to block for in these calls, then they will only block for ConnWriteTimeout duration. We're not sure if it's safe to set this to something much higher than we'll ever need to.

Secondly, cancellation of the parent context doesn't seem to cancel these blocking calls. They appear to wait until their timeouts to finish blocking (not the ConnWriteTimeout). It doesn't appear to be any way to stop them from the caller, is that right?

Finally, they seem to throw different errors than the default redis client when timing out, which has made porting to rueidis difficult. We think redis is returning redis.Nil on timeout, and Rueidis is sending context.DeadExceeded. Which, if canceling the parent context or it's deadline exceeding surpassing was working, would be confusing with timing out on a blocking operation versus teh parent context expiring. What is expected to be returned when a) there is no messages on a stream xreadgroup is calling on and b) when it or BLPOP time out?

Thanks for the help!

@rueian
Copy link
Collaborator

rueian commented Nov 9, 2024

Hi @zeiler,

If the ConnWriteTimeout setting is less than the timeout you want to block for in these calls, then they will only block for ConnWriteTimeout duration. We're not sure if it's safe to set this to something much higher than we'll ever need to.

This sounds like a bug, but I am unable to reproduce it with the following script:

package main

import (
	"context"
	"fmt"
	"strconv"
	"sync"
	"time"

	"github.com/redis/rueidis"
)

func main() {
	c, err := rueidis.NewClient(rueidis.ClientOption{
		InitAddress:      []string{"127.0.0.1:6379"},
		ConnWriteTimeout: 10 * time.Millisecond,
	})
	if err != nil {
		panic(err)
	}
	defer c.Close()

	var wg sync.WaitGroup
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func(v string) {
			go func() {
				time.Sleep(time.Second * 20)
				c.Do(context.Background(), c.B().Lpush().Key(v).Element(v).Build())
			}()
			arr, err := c.Do(context.Background(), c.B().Blpop().Key(v).Timeout(30).Build()).AsStrSlice()
			if len(arr) == 0 {
				panic(err)
			}
			fmt.Println(arr)
			wg.Done()
		}(strconv.Itoa(i))
	}
	wg.Wait()
}

The above code works. Could you share how you made it happen? Also, there are two more things to work with blocking commands:

  1. If your context passed in has a deadline, the deadline will be applied in the client.
  2. If you use the Arbitrary() command builder, you should complete the command building with .Blocking().

Secondly, cancellation of the parent context doesn't seem to cancel these blocking calls. They appear to wait until their timeouts to finish blocking (not the ConnWriteTimeout). It doesn't appear to be any way to stop them from the caller, is that right?

No, it is not correct. The underlying connection should be closed once a blocking call gets canceled.

What is expected to be returned when a) there is no messages on a stream xreadgroup is calling on and b) when it or BLPOP time out?

I am not sure what confusion you are referring to. To me, it seems very clear:

  1. If your context timeout, you will receive a context.DeadExceeded.
  2. If the server unblocks the blocking command, you will receive a Nil message.

@patricklundquist
Copy link

Per the docs for BLPop, timeout should return a Nil reply.
Nil reply: no element could be popped and the timeout expired”
https://redis.io/docs/latest/commands/blpop/

@rueian
Copy link
Collaborator

rueian commented Nov 9, 2024

Hi @patricklundquist,

Thank you for the reference. Yes, you will receive a Nil message from the server on a blocking call if the server times you out. This can happen in only two ways:

  1. You specified a non zero timeout seconds in your request, for example the last argument of BLPOP, and the timeout reached on the server side.
  2. Your blocking call was unblocked by the server early because someone sent an UNBLOCK request to unblock you.

That’s said, a context is purely a client side thing and the server knows nothing about it. If you don’t want context.DeadExceeded, you just use context.Background instead.

@zeiler
Copy link
Contributor Author

zeiler commented Nov 9, 2024

I think I found the bug. And can reproduce with your code snippet by simply making a context with a deadline far into the future like 100 seconds out and using that as the context for the BLPOP. You'll see the err come back as context.DeadlineExceeded now and it'll come back after ConnWriteTimeout, not the 100 second deadline of the context.

I think the bug is on this line https://github.com/redis/rueidis/blob/main/pipe.go#L1143, which is a section like this:

func (p *pipe) syncDo(dl time.Time, dlOk bool, cmd Completed) (resp RedisResult) {
	if dlOk {
		if p.timeout > 0 {
			defaultDeadline := time.Now().Add(p.timeout)
			if dl.After(defaultDeadline) {
				dl = defaultDeadline
			}
		}
		p.conn.SetDeadline(dl)
	} else if p.timeout > 0 && !cmd.IsBlock() {
		p.conn.SetDeadline(time.Now().Add(p.timeout))
	} else {
		p.conn.SetDeadline(time.Time{})
	}

it should use an if like this in the first case when there is a deadline on the context (ie. dlOk is true):

if defaultDeadline.After(dl) {
  dl = defaultDeadline
}

It's reversed in the code currently, so long context deadlines gets shrunk to ConnWriteTimeout. If there is no deadline on the context then this branch isn't hit and why your code snippet works as it is!

I believe the second if is ok when it's not a blocking command to only wait ConnWriteTimeout as that's it's purpose perhaps? Or should that extend to the context deadline as well?

I think the third case is what your code snippets get in which does SetDeadline to nothing so infinite.

This happens in a few places within pipe.go it looks, just searching for p.timeout shows.

While we wait for a new version with the fix to be merged in, I think we can avoid this for now by setting ConnWriteTimeout to 0, but I'm not sure what other implications that has? That would affect the non-blocking calls too like case 2.

Instead of 0, we could also set it to the highest deadline we set in our code plus 1 second.

I guess a third option could be keep ConnWriteTimeout to something reasonable like the default 10 seconds then wrap blocking calls in a select and use a background context on the blocking call:

select {
  case ctx.Done(): 
    // handle the outer context deadline
  default:
    // use a background context so the blpop timeout is used only by hitting case 3.
    client.Do(context.Background()... )
}

Not sure which is better.

@rueian
Copy link
Collaborator

rueian commented Nov 9, 2024

Hi @zeiler,

I think the bug is on this line https://github.com/redis/rueidis/blob/main/pipe.go#L1143, which is a section like this:

Thanks for finding the bug! And you are right, the shortening of the deadline is meant to be applied to non-blocking commands. So I think the correct fix should be:

// syncDo
	if dlOk {
		if p.timeout > 0 && !cmd.IsBlock() {
			defaultDeadline := time.Now().Add(p.timeout)
			if dl.After(defaultDeadline) {
				dl = defaultDeadline
			}
		}
		p.conn.SetDeadline(dl)
	}

// syncDoMulti
	if dlOk {
		if p.timeout > 0 {
			for _, cmd := range multi {
				if cmd.IsBlock() {
					p.conn.SetDeadline(dl)
					goto process
				}
			}
			defaultDeadline := time.Now().Add(p.timeout)
			if dl.After(defaultDeadline) {
				dl = defaultDeadline
			}
		}
		p.conn.SetDeadline(dl)
	}

Would you like to contribute? We can release a v1.0.50-alpha.1 quickly.

I think we can avoid this for now by setting ConnWriteTimeout to 0

Remember that if you set it to zero, the default value (10 seconds) of ConnWriteTimeout will be applied.

I guess a third option could be keep ConnWriteTimeout to something reasonable like the default 10 seconds then wrap blocking calls in a select and use a background context on the blocking call.

I don't really recommend using context deadlines on blocking calls to replace server timeouts. As I said, we will close the connection if the context deadline is exceeded because its purpose is to prevent your client stuck on a network packet dropped event or a hanging server. So, you will need to create new connections after timeouts. That's inefficient in a normal condition.

Therefore, if your application timeout is carried in the context, then I recommend this:

dl, _ := ctx.Deadline()
c.Do(context.Background(), c.B().Blpop().Key(key).Timeout(time.Until(dl).Seconds()).Build())

@zeiler
Copy link
Contributor Author

zeiler commented Nov 9, 2024

Ah got it, but unitl the bug is fixed that would still be ConnWriteTimeout when it's a long deadline right?

@rueian
Copy link
Collaborator

rueian commented Nov 9, 2024

Ah got it, but unitl the bug is fixed that would still be ConnWriteTimeout when it's a long deadline right?

Yes if you still pass the same ctx to the .Do().

@zeiler
Copy link
Contributor Author

zeiler commented Nov 9, 2024

True. Opened a PR #666 I think there were two more places needed changing.

@rueian
Copy link
Collaborator

rueian commented Nov 9, 2024

Thanks @zeiler, we have v1.0.50-alpha.1 released now.

@rueian rueian closed this as completed Nov 9, 2024
@zeiler
Copy link
Contributor Author

zeiler commented Nov 10, 2024

@rueian I think there might be a second bug here. I confirmed that when canceling a context by calling it's cancel() function, the blocking calls continue until their timeout. For example:

  c, err := rueidis.NewClient(rueidis.ClientOption{
		InitAddress:      []string{"127.0.0.1:6379"},
		ConnWriteTimeout: 10 * time.Millisecond,
	})
  if err != nil {
		panic(err)
  }
  defer c.Close()

  // Test that canceling the context will stop the blocking redis calls, this should be a context.Canceled 
  key4 := "keyabc"
  st = time.Now()
  // Long deadline.
  ctx3, cancel3 := context.WithTimeout(context.Background(), 5*time.Second)
  go func() {
    time.Sleep(time.Millisecond * 400)
    cancel3()
  }()
  // Long timeout to be after cancel
  timeout := time.Duration(3)*time.Second
  redisResult := c.Do(ctx, c.B().Blpop().Key(key4).Timeout(timeout.Seconds()).Build())
  arr, err := redisResult.AsStrSlice()
  duration := time.Since(st)
  // Somewhere between 400-500 milliseconds should be the cancellation, but I see 3+ seconds
  if duration.Milliseconds() >= (time.Duration(3) * time.Second).Milliseconds() {
    panic("Surpassed the blocking timeout, not the cancellation")
  }
  if err != context.Canceled {
    panic("not a cancelled error")
  }

Both of those panics would be hit currently.

I'm not sure where in the code this could be though. I do see this line https://github.com/redis/rueidis/blob/main/pipe.go#L1166 which handles a deadline exceeded but I don't see any context cancellation being handled but maybe I'm missing something?

@rueian
Copy link
Collaborator

rueian commented Nov 10, 2024

Hi @zeiler,

We only handle ctx deadlines in the syncDo and syncDoMulti because the net.Conn API only accepts deadlines.

To handle the other ctx cancellation, we need additional goroutines to monitor the ctx and that is available in the pipeline mode. However, for blocking commands, we currently need 3 settings to make them handled in the pipelined mode:

  1. Set AlwaysPipelining: true.
  2. Use c.Dedicate() to acquire a dedicated connection.
  3. Use v1.0.50-alpha.2 which has a fix for recycling blocked connections. fix: close connections that are still blocked while recycling them to pools #667
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/redis/rueidis" // try this with v1.0.50-alpha.2
)

func main() {
	c, err := rueidis.NewClient(rueidis.ClientOption{
		InitAddress:      []string{"127.0.0.1:6379"},
		AlwaysPipelining: true,
	})
	if err != nil {
		panic(err)
	}
	defer c.Close()

	cc, release := c.Dedicate()
	defer release()

	ctx, cancel := context.WithCancel(context.Background())
	time.AfterFunc(time.Second*3, cancel)
	fmt.Println(cc.Do(ctx, cc.B().Blpop().Key("key").Timeout(0).Build()).AsStrSlice())
}

The second c.Dedicate() might not be necessary for the future versions if we specifically route blocking commands to the dpool. Currently, they use the spool where pipeline mode is always disabled:

rueidis/mux.go

Line 236 in b9b810e

wire := m.spool.Acquire()

@zeiler
Copy link
Contributor Author

zeiler commented Nov 10, 2024

Aha! I just found the dpool vs spool part in mux.go.

I believe there is another bug here though: https://github.com/redis/rueidis/blob/main/pipe.go#L836 (and the same in DoMulti) that !ok should be ok I think. So in this scenario where we use contexts that have deadline, ok will be true and we want to use the background queue so that it selects over the context being done or getting a response. With using dpool and this line fixed things work as expected. What's the danger in using dpool for all blocking() and blockingMulti in mux.go?

Does AlwaysPipelining and Dedicate() slow things down in any way or have any other downsides?

@zeiler
Copy link
Contributor Author

zeiler commented Nov 10, 2024

I just tested AlwaysPipelining and Dedicate() and it still returns only after the timeout, not on cancellation. With AlwaysPipelining and Dedicate the error message is correct though eing context.Canceled, but it doesn't cancel immediately.

Fixing that !ok line and using dpool operates correctly stopping the function immediately upon cancel().

@zeiler
Copy link
Contributor Author

zeiler commented Nov 10, 2024

I think because of this line https://github.com/redis/rueidis/blob/main/mux.go#L207 it doesn't ever use pipeline for blocking commands? that's why the dpool change is working but the above is not.

@zeiler
Copy link
Contributor Author

zeiler commented Nov 10, 2024

It actually looks like dpool is never used anywhere unless we make this change to use it within blocking() blockingMulti(). spool is used in the streaming commands.

@rueian
Copy link
Collaborator

rueian commented Nov 10, 2024

I just tested AlwaysPipelining and Dedicate() and it still returns only after the timeout, not on cancellation. With AlwaysPipelining and Dedicate the error message is correct though eing context.Canceled, but it doesn't cancel immediately.

I think it is caused by the bug mentioned previously #667 and fixed in the v1.0.50-alpha.2.

The !ok is intentional: if there is a deadline, we prefer to keep in the sync mode since pipeline mode is heavier. Pipeline mode starts additional goroutines and generally adds up latencies to each request.

@zeiler
Copy link
Contributor Author

zeiler commented Nov 10, 2024

oh nice alpha.2 fixes it with the AlwaysPipelining and dedicate it looks like!

Wouldn't AlwaysPipelining have the same issue of being heavier with more goroutines?

@rueian
Copy link
Collaborator

rueian commented Nov 10, 2024

It actually looks like dpool is never used anywhere unless we make this change to use it within blocking() blockingMulti(). spool is used in the streaming commands.

dpool is only used by Dedicate() and Dedicated() and its connections can go pipelining while spool is a pool whose pipelining is always disabled and guaranteed to operate in sync mode.

Wouldn't AlwaysPipelining have the same issue of being heavier with more goroutines?

Yes, it is. AlwaysPipelining makes connections skip the sync mode and always pipelining.

@zeiler
Copy link
Contributor Author

zeiler commented Nov 10, 2024

I see, having to set AlwaysPipelining at the whole client config then affects all the calls, even non-blocking ones which don't have this issue. So now all the calls will be a lot slower it seems. Also a little confusing having to use Dedicate() for blocking calls always to get the right behaviour and not for other calls. Shouldn't it just work out of the box more?

This issue with the ok still seems to be an issue though since if a context has a deadline then it uses sync mode I don't think it can be cancelled as I didn't see anything that selects between getting a response and the context being done unless it's in background mode. Or perhaps something deeper like the conn will cancel when the cancel() function of the context is called?

@rueian
Copy link
Collaborator

rueian commented Nov 10, 2024

Shouldn't it just work out of the box more?

I also want it to be more out of the box but it requires some overhead.

a context has a deadline then it uses sync mode I don't think it can be cancelled as I didn't see anything that selects between getting a response and the context being done unless it's in background mode.

Or perhaps something deeper like the conn will cancel when the cancel() function of the context is called?

Yes, to summarize, pipeline mode can support both context cancellation and deadlines while the sync mode can only support deadlines because it doesn't have a background goroutine.

We could consider adding a background goroutine to the sync mode by doing things like this:

		done := make(chan struct{})
		once := sync.Once{}
		go func() {
			select {
			case <-done:
			case <-ctx.Done():
				once.Do(func() { p.Close() })
			}
		}()
		resp = p.syncDo(dl, ok, cmd)
		once.Do(func() { close(done) })
		if err := ctx.Err(); err != nil {
			resp = newErrResult(err)
		}

It allocates a channel and a goroutine for each request. It also seems heavier to me.

@zeiler
Copy link
Contributor Author

zeiler commented Nov 10, 2024

yeah that might be nice because AlwaysPipelining is going to do those goroutines anyways it seems like.

So just to confirm because Dedicate() uses dpool and that can pipeline then it will support both cancellation and deadlines correct?

@rueian
Copy link
Collaborator

rueian commented Nov 10, 2024

So just to confirm because Dedicate() uses dpool and that can pipeline then it will support both cancellation and deadlines correct?

With AlwaysPipelining, yes.

@rueian
Copy link
Collaborator

rueian commented Nov 10, 2024

yeah that might be nice because AlwaysPipelining is going to do those goroutines anyways it seems like.

I don't really like the approach of allocating a temporary channel and goroutine for every request in sync mode. It will put some pressure on GC eventually. What the pipelining mode does is slightly different: it reuses background goroutines and channels and only allocates temporary goroutines when contexts are canceled.

@zeiler
Copy link
Contributor Author

zeiler commented Nov 10, 2024

great, thanks for the help on these issues! Not sure if this Dedicate() is documented well, that might be something to add to the README.

@rueian
Copy link
Collaborator

rueian commented Nov 10, 2024

if this Dedicate() is documented well

It is not well documented indeed, but I think we can route blocking commands to the dpool instead of the current spool so that you don't need to use Dedicate() explicitly and we document that to support manual cancelation we need AlwaysPipelining.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants