Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use VStream() #120

Merged
merged 31 commits into from
Feb 11, 2025
Merged

Use VStream() #120

merged 31 commits into from
Feb 11, 2025

Conversation

notfelineit
Copy link
Contributor

@notfelineit notfelineit commented Feb 4, 2025

Summary of changes

This PR transitions to use VTGate's VStream client instead of the specialized psdbconnectStream client. Some of the main changes during this transition are:

  • Processing individual VEvents instead of batched VEvents (Stream would batch them first, and then send this batched response to the connector)
  • Handling VStream COPY ("full sync") and RUNNING ("incremental sync") phases separately:
    • COPY
      • Explicitly wait for a COPY COMPLETED event
      • Return first VGTID that is after the stop VGTID
    • RUNNING
      • Wait for a VGTID that is after the stop VGTID
      • Return VGTID that is after stop VGTID
  • Explicitly handle "no new VGTIDs". This can happen if there was no new activity on the database after the stop position.

Overview

A single Airbyte sync is either a "full sync" or an "incremental sync".

  • "Full sync": just sync all the records up to the current stop position; uses VStream COPY
  • "Incremental sync": sync from a start position up to the current stop position; uses VStream binlog streaming (RUNNING)

In total, a single Airbyte sync will call VStream() at least once: once to fetch the stop position, and the rest to stream rows up to the stop position.

A sync session calls the Read() method, which does:

  1. Call the VStream() api with VGTID set to "current" in order to a stop position
  2. Loop this until we reach the first VGTID after the stop position:
    • If there is a start position (incremental sync):
      • Call VStream() with the VGTID set to start position
      • Process VStream() response events:
        • If VGTID event: update the current position to VGTID event's VGTID
        • If FIELD event: save fields for use later
        • If ROW event: save rows to for queueing later
      • While processing VStream() response events:
        • If VGTID has advanced past the stop position, flush the queued Airbyte records to Airbyte, and return the current VGTID as the start position for the next incremental sync
        • If context.DeadlineExceeded before we have advanced past the stop position, still flush records because this indicates there are no new VGTIDs
        • If we came across any rows, queue them as Airbyte records
    • If there is no start position (full sync):
      • Call VStream() with VGTID set to empty (""). This specifies VStream COPY
      • Process VStream() response events like in incremental sync, with one addition:
        • If COPY_COMPLETED event: note that we have completed the copy phase
      • While processing VStream() response events:
        • If we've completed the copy phase, and VGTID has advanced past the stop position, flush the queued Airbyte records to Airbyte, and return the current VGTID as the start position for the next incremental sync
        • If context.DeadlineExceeded before we have advanced past the stop position, still flush records because this indicates there are no new VGTIDs
        • If we came across any rows, queue them as Airbyte records

@notfelineit
Copy link
Contributor Author

notfelineit commented Feb 4, 2025

Logs will look something like:

{"type":"LOG","log":{"level":"INFO","message":"PlanetScale Source :: Checking connection"}}
{"type":"LOG","log":{"level":"INFO","message":"PlanetScale Source :: State file detected, parsing provided file state.json"}}
{"type":"LOG","log":{"level":"INFO","message":"PlanetScale Source :: Syncing stream t1 with sync mode incremental"}}
{"type":"LOG","log":{"level":"INFO","message":"PlanetScale Source :: Using serialized cursor for stream test-connectors:t1"}}
{"type":"LOG","log":{"level":"INFO","message":"PlanetScale Source :: [test-connectors:primary:t1 shard : -] Peeking to see if there's any new rows"}}
{"type":"LOG","log":{"level":"INFO","message":"PlanetScale Source :: [test-connectors:primary:t1 shard : -] New rows found, syncing rows for 1m0s"}}
{"type":"LOG","log":{"level":"INFO","message":"PlanetScale Source :: [test-connectors:primary:t1 shard : -] Syncing rows from cursor [shard:\"-\"  keyspace:\"test-connectors\"  position:\"MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-74,e1e896df-dae3-11ef-895b-626e6780cb50:1-15610,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-96\"]"}}
{"type":"LOG","log":{"level":"INFO","message":"PlanetScale Source :: [test-connectors:primary:t1 shard : -] Will be waiting for COPY_COMPLETED event? false"}}
{"type":"LOG","log":{"level":"INFO","message":"PlanetScale Source :: [test-connectors:primary:t1 shard : -] Requesting to sync from cursor position [MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-74,e1e896df-dae3-11ef-895b-626e6780cb50:1-15610,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-96] to stop cursor position [MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-74,e1e896df-dae3-11ef-895b-626e6780cb50:1-17634,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-96] in cells planetscale_operator_default; using last known PK: false"}}
{"type":"LOG","log":{"level":"INFO","message":"PlanetScale Source :: [test-connectors:primary:t1 shard : -] FIELD event found, setting fields to [name:\"id\"  type:INT64  table:\"t1\"  org_table:\"t1\"  database:\"test-connectors\"  org_name:\"id\"  column_length:20  charset:63  flags:49667  column_type:\"bigint\" name:\"uuid\"  type:VARCHAR  table:\"t1\"  org_table:\"t1\"  database:\"test-connectors\"  org_name:\"uuid\"  column_length:1024  charset:255  column_type:\"varchar(256)\"]"}}
{"type":"LOG","log":{"level":"INFO","message":"PlanetScale Source :: [test-connectors:primary:t1 shard : -] Stop position [MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-74,e1e896df-dae3-11ef-895b-626e6780cb50:1-17634,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-96] found. Waiting for next VGTID after stop position."}}
{"type":"LOG","log":{"level":"INFO","message":"PlanetScale Source :: [test-connectors:primary:t1 shard : -] Exiting sync and flushing records because current position MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-74,e1e896df-dae3-11ef-895b-626e6780cb50:1-17635,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-96 has passed stop position MySQL56/0d5afdd6-da80-11ef-844c-26dc1854a614:1-74,e1e896df-dae3-11ef-895b-626e6780cb50:1-17634,e50c022a-dade-11ef-8083-d2b0b749d1bb:1-96"}}

@@ -189,9 +195,12 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
if lcErr != nil {
return currentSerializedCursor, errors.Wrap(err, "Unable to get latest cursor position")
}
if latestCursorPosition == "" {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Necessary because if we get an empty stop position, the sync will run indefinitely.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would we get an empty stop position? from the user setting that or from VStream sending it somehow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I tested locally (e2e), there was one case where I got an empty string in an error case. I forgot the exact conditions but thought it would be good to handle.


// the last synced VGTID is not at least, or after the current VGTID
if currentPosition.Position != "" && !positionAtLeast(latestCursorPosition, currentPosition.Position) {
if currentPosition.Position != "" && !positionAfter(latestCursorPosition, currentPosition.Position) {
Copy link
Contributor Author

@notfelineit notfelineit Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the stop position is behind the start position, exit the sync.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is !positionAfter compatible with the two positions being identical? reads that way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - if they are equal, !positionAfter will return true (positionAfter will be false)

if err != nil {
return tc, 0, err
}
if conn != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: will this correctly defer closing the connection?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like it would.

when err != nil will it always be the case that conn == nil?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, although conn should not be nil if err was not nil. So you could shorten it to:

	vtgateClient, conn, err := p.initializeVTGateClient(ctx, ps)
	if err != nil {
		return tc, 0, err
	}
	defer conn.Close()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you both! I'll simplify.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realized we don't mock conn in testing, and that's likely the reason for that behavior 🤔 If alright, I'd like to handle mocking properly so we don't need the nil check in a separate PR.

p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sCopy phase completed. Waiting for next VGTID after stop position.", preamble))
watchForVgGtidChange = true
}
if !waitForCopyCompleted && positionEqual(tc.Position, stopPosition) {
Copy link
Contributor Author

@notfelineit notfelineit Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During an incremental sync, we'll have one VStream response for every VGTID from the start position to the stop position, so it's safe to stop looking for rows at the exact stop position.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it's safe to stop looking for rows at the exact stop position.
does setting watchForVgGtidChange = true make it stop looking for exact stop position?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the var name to "canFinishSync" as I think that better represents the purpose of the variable..essentially it's saying "Have we finished queueing all the records we need to given the STOP POSITION or copy completed"?

// the current VGTID may have already advanced past the stop position.
watchForVgGtidChange = watchForVgGtidChange || positionAtLeast(tc.Position, stopPosition)
for _, result := range res.Result {
if waitForCopyCompleted && copyCompletedSeen {
Copy link
Contributor Author

@notfelineit notfelineit Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During a full sync, we do not care about finding the exact stop position, we only care about looking for rows up to completing the copy phase.

}

// Exit sync and flush records once the VGTID position is past the desired stop position, and we're no longer waiting for COPY phase to complete
if watchForVgGtidChange && positionAfter(tc.Position, stopPosition) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we know we can stop looking for rows, and the current position has advanced past the stop position, we can exit syncing.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on what this watchForVgGtidChange does, maybe a better variable name would be watchForStopPosition?

qr := sqltypes.Proto3ToResult(result)
for _, row := range qr.Rows {
resultCount += 1
sqlResult := &sqltypes.Result{
Fields: result.Fields,
Fields: fields,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the same fields we found from the FIELD VEvent.

return gtid, nil
}
}
return "", errors.New("unable to find VEvent of VGTID type to use as stop cursor")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only find a stop position from the VGTID VEvent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattlord is that a safe assumption? that there will always be a VGTID event to get the stop position from?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you should get the snapshot's GTID position as the second event in the first batch, for example:

[type:VGTID  vgtid:{shard_gtids:{keyspace:"customer"  shard:"-80"}  shard_gtids:{keyspace:"customer"  shard:"80-"  gtid:"MySQL56/17e6f390-e4a7-11ef-928d-554558eb9218:1-57"}}  keyspace:"customer"  shard:"80-"]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! 🫶

}
}

func toTopoTabletType(tabletType psdbconnect.TabletType) topodata.TabletType {
Copy link
Contributor Author

@notfelineit notfelineit Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@notfelineit notfelineit marked this pull request as ready for review February 5, 2025 21:29
Copy link

@maxenglander maxenglander left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i didn't review the tests yet because it's a lot, but reviewed most everything else. looking pretty good to me, just left a few comments/questions for now.

func (x *connectSyncClientMock) Recv() (*psdbconnect.SyncResponse, error) {
if x.lastResponseSent >= len(x.syncResponses) {
func (x *vtgateVStreamClientMock) Recv() (*vtgate.VStreamResponse, error) {
if x.lastResponseSent >= len(x.vstreamResponses) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little unclear to me what lastResponseSent is tracking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I am pretty sure last response sent keeps track of which index of mock sync responses should be sent next!

@@ -189,9 +195,12 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
if lcErr != nil {
return currentSerializedCursor, errors.Wrap(err, "Unable to get latest cursor position")
}
if latestCursorPosition == "" {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would we get an empty stop position? from the user setting that or from VStream sending it somehow?


// the last synced VGTID is not at least, or after the current VGTID
if currentPosition.Position != "" && !positionAtLeast(latestCursorPosition, currentPosition.Position) {
if currentPosition.Position != "" && !positionAfter(latestCursorPosition, currentPosition.Position) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is !positionAfter compatible with the two positions being identical? reads that way.

if err != nil {
return tc, 0, err
}
if conn != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like it would.

when err != nil will it always be the case that conn == nil?


if waitForCopyCompleted && copyCompletedSeen {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sCopy phase completed. Waiting for next VGTID after stop position.", preamble))
watchForVgGtidChange = true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a typo? should it just be VGtid instead of VgGtid?

p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sCopy phase completed. Waiting for next VGTID after stop position.", preamble))
watchForVgGtidChange = true
}
if !waitForCopyCompleted && positionEqual(tc.Position, stopPosition) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it's safe to stop looking for rows at the exact stop position.
does setting watchForVgGtidChange = true make it stop looking for exact stop position?

}

// Exit sync and flush records once the VGTID position is past the desired stop position, and we're no longer waiting for COPY phase to complete
if watchForVgGtidChange && positionAfter(tc.Position, stopPosition) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on what this watchForVgGtidChange does, maybe a better variable name would be watchForStopPosition?

}

func (p PlanetScaleEdgeDatabase) initializeVTGateClient(ctx context.Context, ps PlanetScaleSource) (vtgateservice.VitessClient, grpcclient.ConnPool, error) {
if p.vtgateClientFn == nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to support both the p.vtgateClientFn == nil path as well as the != nil path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unfortunately this is a remnant of the old testing style. I think there's opportunity for improvements on the tests and mocking in a next PR 🙏


"google.golang.org/grpc"
"vitess.io/vitess/go/vt/proto/vtgate"
vtgateservice "vitess.io/vitess/go/vt/proto/vtgateservice"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting that this alias is not needed since it matches the package name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! 😄

if err != nil {
return tc, 0, err
}
if conn != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, although conn should not be nil if err was not nil. So you could shorten it to:

	vtgateClient, conn, err := p.initializeVTGateClient(ctx, ps)
	if err != nil {
		return tc, 0, err
	}
	defer conn.Close()

for _, event := range res.Events {
switch event.Type {
case binlogdata.VEventType_VGTID:
vgtid := event.GetVgtid().ShardGtids[0]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes there's only a single shard. The position will not be the same for all shards. Here's an example event:

type:VGTID  vgtid:{shard_gtids:{keyspace:"customer"  shard:"-80"  gtid:"MySQL56/0f2b79a6-e4a7-11ef-a8a5-e8942912de27:1-58"}  shard_gtids:{keyspace:"customer"  shard:"80-"  gtid:"MySQL56/17e6f390-e4a7-11ef-928d-554558eb9218:1-57"}}  keyspace:"customer"  shard:"-80" type:COMMIT  timestamp:1738859305  current_time:1738859305605332000  keyspace:"customer"  shard:"-80"]

We should discuss this. Perhaps I'm missing something here. Also seems odd to me that it's a table cursor. Are the syncs always for a single table?

I think that we want loop over the ShardGtids and join them. We have code in vitess for this to get the union of two GTID sets: replication.GTIDSet.Union

Or is this somehow per table per shard here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your confusion is valid - the code here runs per table per shard. There's likely some optimizations we can make, as I don't 100% understand why the connector was built to do one VStream per table per shard originally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, the code here is executed within two for loops: for table in tables, then for shard in shards: https://github.com/planetscale/airbyte-source/blob/main/cmd/airbyte-source/read.go#L98-L110

return gtid, nil
}
}
return "", errors.New("unable to find VEvent of VGTID type to use as stop cursor")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you should get the snapshot's GTID position as the second event in the first batch, for example:

[type:VGTID  vgtid:{shard_gtids:{keyspace:"customer"  shard:"-80"}  shard_gtids:{keyspace:"customer"  shard:"80-"  gtid:"MySQL56/17e6f390-e4a7-11ef-928d-554558eb9218:1-57"}}  keyspace:"customer"  shard:"80-"]

@@ -400,8 +457,33 @@ func (p PlanetScaleEdgeDatabase) printQueryResult(qr *sqltypes.Result, tableName
}
}

// positionAtLeast returns true if position `a` is equal to or after position `b`
func positionAtLeast(a string, b string) bool {
func buildVStreamRequest(tabletType psdbconnect.TabletType, table string, shard string, keyspace string, gtid string) *vtgate.VStreamRequest {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so I see that this is a single table and a single shard. So perhaps this answers my question above...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! Exactly 😄

p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting sync and flushing records due to error: %+v", preamble, err))
if s, ok := status.FromError(err); ok && s.Code() == codes.DeadlineExceeded && canFinishSync {
// No next VGTID found, but we previously found stop position or finished VStream COPY phase
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting sync and flushing records because no new VGTID found after stop position %+v", preamble, stopPosition))
Copy link
Contributor Author

@notfelineit notfelineit Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, this is the line that addresses:

Explicitly handle "no new VGTIDs". This can happen if there was no new activity on the database after the stop position.

@mattlord
Copy link

Nice work on this! I approved, for what that's worth. 🙂

@notfelineit notfelineit merged commit e84e4ba into main Feb 11, 2025
3 checks passed
@notfelineit notfelineit deleted the use-vstream branch February 11, 2025 01:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants