Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Ensure correct file ordering. #90011

Merged
merged 1 commit into from
Oct 20, 2022
Merged

Conversation

miretskiy
Copy link
Contributor

@miretskiy miretskiy commented Oct 14, 2022

When async flushing enabled, the following sequence of events is possible (even if very unlikely):

  • k@t1 is emitted, causing async flush to write file f1
  • k@t2 is emitted, causing async flush to write file f2
  • f2 is written out before f1.

In this unlikely scenario -- and the reason why it's unlikely is that we have to generate megabytes of data to cause a flush, unless, file_size parameter was pathologically small -- if a client were to read the contents of the directory right after step 2, and then read directory after step 3, the client will first observe k@t2, and then observe k@t1 -- which is a violation of ordering guarantees.

This PR fixes this issue by adopting a queue so that if there is a pending flush in flight, the next flush is queued behind.

It is possible that this simple approach may result in throughput decrease. This situation will occur if the underlying storage (s3 or gcp) cannot keep up with writing out data before the next file comes in. However, at this point, such situation is unlikely for two reasons: one, the rest of changefeed machinery must be able to generate one or more files worth of data before the previous flush completes -- and this task in of itself is not trivial, and two, changefeed must have enough memory allocated to it so that pushback mechanism does not trigger. The above assumption was validated in reasonably sized test -- i.e. the non-zero queue depth was never observed. Nonetheless, this PR also adds a log message which may be helpful to detect the situation when the sink might not keep up with the incoming data rate.

A more complex solution -- for example, allow unlimited inflight requests during backfill -- may be revisited later, if the above assumption proven incorrect.

Fixes #89683

Release note: None.

@miretskiy miretskiy requested a review from ajwerner October 14, 2022 20:49
@miretskiy miretskiy requested a review from a team as a code owner October 14, 2022 20:49
@cockroach-teamcity
Copy link
Member

This change is Reviewable

Comment on lines 670 to 672
go func() {
s.asyncFlush(file, dest)
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not go s.asyncFlush(file, dest)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not :)

pop := func(err error) destFile {
s.flushQueue.Lock()
defer s.flushQueue.Unlock()
s.flushQueue.err = err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check to make sure this isn't populated already? It seems to me like this is racy because you don't check for an error when you go to append to the queue. Imagine the case where you have one flush call (and so nothing in the queue) and then it gets an error and assigns here. The goroutine returns but another call to flush above launches a new goroutine which then succeeds and clobbers this error.

I feel like you should just simplify this code so that the file currently being flushed is at the front of the queue and it gets popped after it has succeeded. I think it'll simplify some things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.
The reality is that I used to have exactly that code, but undoing a more complex version, I left the complexity I should have had.

s.flushErr.CompareAndSwap(nil, &flushError{error: err})
s.flushQueue.Lock()
defer s.flushQueue.Unlock()
if s.flushQueue.q == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we launch the goroutine here we don't append to the queue, so it's still nil if we were to come here again, and we'd launch yet another goroutine, which seems bad. It seems to me like we ought to just always append to the queue and then have asyncFlush not take arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Goog point.

s.flushErr.CompareAndSwap(nil, &flushError{error: err})
s.flushQueue.Lock()
defer s.flushQueue.Unlock()
if s.flushQueue.q == nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Goog point.

Comment on lines 670 to 672
go func() {
s.asyncFlush(file, dest)
}()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not :)

pop := func(err error) destFile {
s.flushQueue.Lock()
defer s.flushQueue.Unlock()
s.flushQueue.err = err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.
The reality is that I used to have exactly that code, but undoing a more complex version, I left the complexity I should have had.

}

s.flushQueue.q = append(s.flushQueue.q, destFile{file: file, dest: dest})
if len(s.flushQueue.q) == 1 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm going to rerun the tests to see if I can hit > 1 queued item.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still wasn't able to get that; basically, in my test, I was generating 1 file every 300ms or so, and it appear that most flushes took well under 100ms; I suppose it is possible for this queue to be non-zero; but that's fine -- as the PR says, if it becomes a problem, we can revisit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just set the file size to like 1 byte in testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, i mean i wanted to see how this runs in production. Of course we can do something like that in a unit test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified with crazy small file size (4kb), files are queued (and then some):

I221016 20:46:26.324904 164297 ccl/changefeedccl/sink_cloudstorage.go:671 ⋮ [n2,f‹eb3415c6›,job=805708385569374209] 1565589  changefeed flush queue depth 667347; ~2669388000 bytes to flush

@miretskiy miretskiy force-pushed the async branch 2 times, most recently from c538c4f to 8d57dd5 Compare October 15, 2022 13:53
Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)


pkg/ccl/changefeedccl/sink_cloudstorage.go line 622 at r4 (raw file):

// waitAsyncFlush waits until all async flushes complete.
func (s *cloudStorageSink) waitAsyncFlush() error {
	s.flushGroup.Wait()

is this definitely safe? You're not allowed to add a to a wait group while you're waiting for it. It seems to me that something could be added to the queue while a wait call is going on. This can panic! https://pkg.go.dev/sync#WaitGroup.Add

@miretskiy
Copy link
Contributor Author

is this definitely safe? You're not allowed to add a to a wait group while you're waiting for it. It seems to me that something could be added to the queue while a wait call is going on. This can panic! https://pkg.go.dev/sync#WaitGroup.Add

blarg -- excellent catch.

numGoRoutines int
}
// Signaled whenever changes are made to flushQueue struct.
flushC *sync.Cond
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put this in the flushQueue and don't make it a pointer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. done.

@miretskiy miretskiy force-pushed the async branch 3 times, most recently from 33ef4cd to 05b69a4 Compare October 18, 2022 13:16
Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)


pkg/ccl/changefeedccl/sink_cloudstorage.go line 624 at r6 (raw file):

// waitAsyncFlush waits until all async flushes complete.
func (s *cloudStorageSink) waitAsyncFlush() error {

is this really what you want when you call this? Don't you just want the currently in-flight files to be flushed? If it's the same thing because there's no concurrency, can you add a comment about that?


pkg/ccl/changefeedccl/sink_cloudstorage.go line 679 at r6 (raw file):

	if len(s.flushQueue.q) == 1 {
		// Start a new go routine if the flush queue was empty.
		s.flushQueue.numGoRoutines++

This numGoRoutines makes the reasoning complex. I'm having a hard time convincing myself of correctness. Maybe it'd be better if instead you launched a goroutine for each file you wanted to flush and then broadcasted whenever the head of the queue is flushed and have the goroutine associated with the file be the only goroutine to flush the head of the queue file. Then it'd be easy to reason about. One goroutine per file, created with that file. Right now, this code which passes the responsibility to some maybe previously created goroutine is complex for no good reason.


pkg/ccl/changefeedccl/sink_cloudstorage.go line 695 at r6 (raw file):

		s.flushQueue.Lock()
		defer s.flushQueue.Unlock()
		defer s.flushQueue.flushC.Signal()

this seems wrong, what if you signal a waiter instead of a flusher?


pkg/ccl/changefeedccl/sink_cloudstorage.go line 702 at r6 (raw file):

		s.flushQueue.Lock()
		defer s.flushQueue.Unlock()
		defer s.flushQueue.flushC.Signal()

why signal and why every time you pop? Signal goes to just one waiter. Maybe broadcast, but like, why not broadcast only when the queue is empty given that's what waiters want.

@miretskiy miretskiy force-pushed the async branch 2 times, most recently from 6410bb5 to 935abdc Compare October 18, 2022 23:20
@jayshrivastava
Copy link
Contributor

Looks very good to me :) I'm going to let Andrew give the final ✅

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this much better

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Terminology nit: both writing the file and waiting for the in flight files to be written are called flush. It's sort of confusing.

@@ -325,6 +332,8 @@ var partitionDateFormats = map[string]string{
}
var defaultPartitionFormat = partitionDateFormats["daily"]

const flushQueueDepth = 256
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: say something about this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack; done.

@@ -695,7 +766,10 @@ func (f *cloudStorageSinkFile) flushToStorage(
// Close implements the Sink interface.
func (s *cloudStorageSink) Close() error {
s.files = nil
return errors.CombineErrors(s.waitAsyncFlush(), s.es.Close())
err := s.waitAsyncFlush(context.Background())
close(s.asyncFlushCh) // Signal flusher to exit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lowercase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

When async flushing enabled, the following sequence of events is
possible (even if very unlikely):

* k@t1 is emitted, causing async flush to write file f1
* k@t2 is emitted, causing async flush to write file f2
* f2 is written out before f1.

In this unlikely scenario -- and the reason why it's unlikely is that we
have to generate megabytes of data to cause a flush, unless, file_size
parameter was pathologically small -- if a client were to read the
contents of the directory right after step 2, and then read directory
after step 3, the client will first observe k@t2, and then observe k@t1
-- which is a violation of ordering guarantees.

This PR fixes this issue by adopting a queue so that if there
is a pending flush in flight, the next flush is queued behind.

It is possible that this simple approach may result in throughput
decrease.  This situation will occur if the underlying
storage (s3 or gcp) cannot keep up with writing out data before
the next file comes in.  However, at this point, such
situation is unlikely for two reasons: one, the rest of
changefeed machinery must be able to generate one or more files
worth of data before the previous flush completes -- and this
task in of itself is not trivial, and two, changefeed must
have enough memory allocated to it so that pushback mechanism
does not trigger.  The above assumption was validated in reasonably
sized test -- i.e. the non-zero queue depth was never observed.
Nonetheless, this PR also adds a log message which may be helpful
to detect the situation when the sink might not keep up with the
incoming data rate.

A more complex solution -- for example, allow unlimited inflight
requests during backfill -- may be revisited later, if the above
assumption proven incorrect.

Fixes cockroachdb#89683

Release note: None.
@miretskiy
Copy link
Contributor Author

bors r+

@craig
Copy link
Contributor

craig bot commented Oct 20, 2022

Build succeeded:

@craig craig bot merged commit 5d3f4df into cockroachdb:master Oct 20, 2022
@miretskiy miretskiy deleted the async branch December 2, 2022 21:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

changefeedccl: Async flushes may produce out of order events
4 participants