Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable max retries, increase read timeout to 5 minutes #121

Merged
merged 8 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/airbyte-source/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@
"description": "A JSON string containing start GTIDs for every { keyspace: { shard: starting_gtid } }",
"order": 7
},
"max_retries": {
"type": "integer",
"title": "Max retries",
"default": 3,
"description": "The max number of times we continue syncing after potential errors",
"order": 8
},
"options": {
"type": "object",
"title": "Customize serialization",
Expand Down
9 changes: 7 additions & 2 deletions cmd/internal/mock_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,14 @@ type vstreamClientMock struct {
vstreamFnInvokedCount int
}

type vstreamResponse struct {
response *vtgate.VStreamResponse
err error
}

type vtgateVStreamClientMock struct {
lastResponseSent int
vstreamResponses []*vtgate.VStreamResponse
vstreamResponses []*vstreamResponse
grpc.ClientStream
}

Expand All @@ -70,7 +75,7 @@ func (x *vtgateVStreamClientMock) Recv() (*vtgate.VStreamResponse, error) {
return nil, io.EOF
}
x.lastResponseSent += 1
return x.vstreamResponses[x.lastResponseSent-1], nil
return x.vstreamResponses[x.lastResponseSent-1].response, x.vstreamResponses[x.lastResponseSent-1].err
}

func (x *vstreamClientMock) CloseSession(context.Context, *vtgate.CloseSessionRequest, ...grpc.CallOption) (*vtgate.CloseSessionResponse, error) {
Expand Down
1 change: 1 addition & 0 deletions cmd/internal/planetscale_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type PlanetScaleSource struct {
UseRdonly bool `json:"use_rdonly"`
StartingGtids string `json:"starting_gtids"`
Options CustomSourceOptions `json:"options"`
MaxRetries uint `json:"max_retries"`
}

type CustomSourceOptions struct {
Expand Down
157 changes: 93 additions & 64 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
err error
sErr error
currentSerializedCursor *SerializedCursor
syncMode string
)

tabletType := psdbconnect.TabletType_primary
Expand All @@ -185,59 +186,69 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
}

currentPosition := lastKnownPosition
if currentPosition.Position == "" || currentPosition.LastKnownPk != nil {
syncMode = "full"
} else {
syncMode = "incremental"
}

table := s.Stream
readDuration := 1 * time.Minute
readDuration := 5 * time.Minute
maxRetries := ps.MaxRetries

preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", table.Namespace, TabletTypeToString(tabletType), table.Name, currentPosition.Shard)

for {
p.Logger.Log(LOGLEVEL_INFO, preamble+"Peeking to see if there's any new rows")
latestCursorPosition, lcErr := p.getLatestCursorPosition(ctx, currentPosition.Shard, currentPosition.Keyspace, table, ps, tabletType)
if lcErr != nil {
p.Logger.Log(LOGLEVEL_ERROR, preamble+fmt.Sprintf("Error fetching latest cursor position: %+v", lcErr))
return currentSerializedCursor, errors.Wrap(err, "Unable to get latest cursor position")
}
if latestCursorPosition == "" {
p.Logger.Log(LOGLEVEL_ERROR, preamble+fmt.Sprintf("Error fetching latest cursor position, was empty string: %+v", latestCursorPosition))
return currentSerializedCursor, errors.Wrap(err, "Unable to get latest cursor position")
}
p.Logger.Log(LOGLEVEL_INFO, preamble+"Peeking to see if there's any new GTIDs")
stopPosition, lcErr := p.getStopCursorPosition(ctx, currentPosition.Shard, currentPosition.Keyspace, table, ps, tabletType)
if lcErr != nil {
p.Logger.Log(LOGLEVEL_ERROR, preamble+fmt.Sprintf("Error fetching latest cursor position: %+v", lcErr))
return currentSerializedCursor, errors.Wrap(err, "Unable to get latest cursor position")
}
if stopPosition == "" {
p.Logger.Log(LOGLEVEL_ERROR, preamble+fmt.Sprintf("Error fetching latest cursor position, was empty string: %+v", stopPosition))
return currentSerializedCursor, errors.Wrap(err, "Unable to get latest cursor position")
}
Comment on lines +201 to +210
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled:

  • Fetching the desired stop position
  • Validating the desired stop position

Out of the for loop so we aren't "advancing the desired stop position" on any kind of error. This helps the sync finish faster.


// the last synced VGTID is not at least, or after the current VGTID
if currentPosition.Position != "" && !positionAfter(latestCursorPosition, currentPosition.Position) {
p.Logger.Log(LOGLEVEL_INFO, preamble+"No new rows found, exiting")
return TableCursorToSerializedCursor(currentPosition)
}
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"New rows found, syncing rows for %v", readDuration))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"Syncing rows from cursor [%v]", currentPosition))

currentPosition, recordCount, err := p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, readDuration)
if currentPosition.Position != "" {
currentSerializedCursor, sErr = TableCursorToSerializedCursor(currentPosition)
if sErr != nil {
// if we failed to serialize here, we should bail.
return currentSerializedCursor, errors.Wrap(sErr, "unable to serialize current position")
}
// the last synced VGTID is not at least, or after the current VGTID
if currentPosition.Position != "" && !positionAfter(stopPosition, currentPosition.Position) {
p.Logger.Log(LOGLEVEL_INFO, preamble+"No new GTIDs found, exiting")
return TableCursorToSerializedCursor(currentPosition)
}
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"New GTIDs found, syncing for %v", readDuration))

var syncCount uint = 0
totalRecordCount := 0

for {
syncCount += 1
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sStarting sync #%v", preamble, syncCount))
newPosition, recordCount, err := p.sync(ctx, syncMode, currentPosition, stopPosition, table, ps, tabletType, readDuration)
totalRecordCount += recordCount
currentSerializedCursor, sErr = TableCursorToSerializedCursor(currentPosition)
if sErr != nil {
// if we failed to serialize here, we should bail.
return currentSerializedCursor, errors.Wrap(sErr, "unable to serialize current position")
}
if err != nil {
if s, ok := status.FromError(err); ok {
// if the error is anything other than server timeout, keep going
if s.Code() != codes.DeadlineExceeded {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%vGot error [%v], returning with cursor [%v] after server timeout", preamble, s.Code(), currentPosition))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%v%v records synced after %v syncs. Got error [%v], returning with cursor [%v] after gRPC error", preamble, totalRecordCount, syncCount, s.Code(), currentPosition))
if syncCount >= maxRetries {
return currentSerializedCursor, nil
} else {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%v%v records synced. Continuing with cursor after recoverable error %+v", preamble, recordCount, err))
}
} else if errors.Is(err, io.EOF) {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%vFinished reading %v records for table [%v]", preamble, recordCount, table.Name))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%vFinished reading %v records after %v syncs for table [%v]", preamble, totalRecordCount, syncCount, table.Name))
return currentSerializedCursor, nil
} else {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%vNon-grpc error [%v]]", preamble, err))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%v%v records synced after %v syncs. Got error [%v], returning with cursor [%v] after server timeout", preamble, totalRecordCount, syncCount, err, currentPosition))
return currentSerializedCursor, err
}
}
currentPosition = newPosition
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%vContinuing to next sync #%v. Set next sync start position to [%+v].", preamble, syncCount+1, currentPosition))
}
}

func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration) (*psdbconnect.TableCursor, int, error) {
func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration) (*psdbconnect.TableCursor, int, error) {
preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", s.Namespace, TabletTypeToString(tabletType), s.Name, tc.Shard)

defer p.Logger.Flush()
Expand All @@ -247,6 +258,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
var (
err error
vtgateClient vtgateservice.VitessClient
fields []*query.Field
)

vtgateClient, conn, err := p.initializeVTGateClient(ctx, ps)
Expand All @@ -257,18 +269,12 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
defer conn.Close()
}

// If there is a LastKnownPk, that means we were in a copy phase
// Setting position to "" specifies COPY phase in the VStream API
if tc.LastKnownPk != nil {
tc.Position = ""
}

waitForCopyCompleted := tc.Position == ""
copyCompletedSeen := false
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sWill be waiting for COPY_COMPLETED event? %v", preamble, waitForCopyCompleted))

vtgateReq := buildVStreamRequest(tabletType, s.Name, tc.Shard, tc.Keyspace, tc.Position)
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sRequesting to sync from cursor position [%v] to stop cursor position [%v] in cells %v; using last known PK: %v", preamble, tc.Position, stopPosition, vtgateReq.Flags.Cells, tc.LastKnownPk != nil))
vtgateReq := buildVStreamRequest(tabletType, s.Name, tc.Shard, tc.Keyspace, tc.Position, tc.LastKnownPk)
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sRequesting VStream with %+v", preamble, vtgateReq))
c, err := vtgateClient.VStream(ctx, vtgateReq)

if err != nil {
Expand All @@ -281,22 +287,27 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
keyspaceOrDatabase = ps.Database
}

isFullSync := syncMode == "full"
copyCompletedSeen := false
// Can finish sync once we've synced to the stop position, or finished the VStream COPY phase
canFinishSync := false
resultCount := 0

var fields []*query.Field

for {
res, err := c.Recv()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattlord I found that on the "no more VGTIDs" case, the process was blocking here. So I opted to just keep the regular context.WithTimeout (instead of a timer like we discussed), but increased the wait time from 1 minute to 5 minutes.

After testing locally on a similarly higher traffic table & database, 5 minutes was more successful than the 1 minute timeout.

if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.DeadlineExceeded && canFinishSync {
// No next VGTID found, but we previously found stop position or finished VStream COPY phase
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting sync and flushing records because no new VGTID found after stop position %+v", preamble, stopPosition))
if s, ok := status.FromError(err); ok && s.Code() == codes.DeadlineExceeded {
// No next VGTID found
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting sync and flushing records because no new VGTID found after last position or deadline exceeded %+v", preamble, tc))
return tc, resultCount, err
} else if err == io.EOF {
// EOF is an acceptable error indicating VStream is finished.
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sExiting sync and flushing records because EOF encountered at position %+v", preamble, tc))
return tc, resultCount, io.EOF
} else {
p.Logger.Log(LOGLEVEL_ERROR, fmt.Sprintf("%sExiting sync and flushing records due to error: %+v", preamble, err))
return tc, resultCount, err
}
p.Logger.Log(LOGLEVEL_ERROR, fmt.Sprintf("%sExiting sync and flushing records due to error: %+v", preamble, err))
return tc, resultCount, err
}

var rows []*query.QueryResult
Expand All @@ -305,11 +316,18 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
case binlogdata.VEventType_VGTID:
vgtid := event.GetVgtid().ShardGtids[0]
if vgtid != nil {
// Update cursor to new VGTID
tc = &psdbconnect.TableCursor{
Shard: tc.Shard,
Keyspace: tc.Keyspace,
Position: vgtid.Gtid,
tc.Position = vgtid.Gtid
if vgtid.TablePKs != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to check the len() here as it could be empty and then we get a panic on the next line as there's no element at position 0.

tablePK := vgtid.TablePKs[0]
if tablePK != nil {
// Setting LastKnownPk allows a COPY phase to pick up where it left off
lastPK := tablePK.Lastpk
tc.LastKnownPk = lastPK
} else {
tc.LastKnownPk = nil
}
} else {
tc.LastKnownPk = nil
}
Comment on lines +326 to 331
Copy link

@mattlord mattlord Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could set it to nil before this outer block (after tc.Position = vgtid.Gtid) to simplify the code/branching.

}
case binlogdata.VEventType_LASTPK:
Expand Down Expand Up @@ -343,12 +361,13 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
}
}

if waitForCopyCompleted && copyCompletedSeen {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sReady to finish sync and flush since copy phase completed.", preamble))
// if isFullSync && copyCompletedSeen {
if isFullSync && copyCompletedSeen {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sReady to finish sync and flush since copy phase completed or stop VGTID passed", preamble))
canFinishSync = true
}
if !waitForCopyCompleted && positionEqual(tc.Position, stopPosition) {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sReady to finish sync and flush since stop position [%+v] found.", preamble, stopPosition))
if !isFullSync && positionEqual(tc.Position, stopPosition) {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sReady to finish sync and flush since stop position [%+v] found", preamble, stopPosition))
canFinishSync = true
}

Expand All @@ -373,10 +392,9 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
}
}
}

}

func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (string, error) {
func (p PlanetScaleEdgeDatabase) getStopCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (string, error) {
defer p.Logger.Flush()
timeout := 45 * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
Expand All @@ -394,7 +412,7 @@ func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, sh
defer conn.Close()
}

vtgateReq := buildVStreamRequest(tabletType, s.Name, shard, keyspace, "current")
vtgateReq := buildVStreamRequest(tabletType, s.Name, shard, keyspace, "current", nil)
vtgateCursor, vtgateErr := vtgateClient.VStream(ctx, vtgateReq)

if vtgateErr != nil {
Expand Down Expand Up @@ -465,8 +483,8 @@ func (p PlanetScaleEdgeDatabase) printQueryResult(qr *sqltypes.Result, tableName
}
}

func buildVStreamRequest(tabletType psdbconnect.TabletType, table string, shard string, keyspace string, gtid string) *vtgate.VStreamRequest {
return &vtgate.VStreamRequest{
func buildVStreamRequest(tabletType psdbconnect.TabletType, table string, shard string, keyspace string, gtid string, lastKnownPk *query.QueryResult) *vtgate.VStreamRequest {
req := &vtgate.VStreamRequest{
TabletType: toTopoTabletType(tabletType),
Vgtid: &binlogdata.VGtid{
ShardGtids: []*binlogdata.ShardGtid{
Expand All @@ -488,6 +506,17 @@ func buildVStreamRequest(tabletType psdbconnect.TabletType, table string, shard
}},
},
}

if lastKnownPk != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes a bug: if req.Vgtid.ShardGtids[0].TablePKs is non nil, then copy phase is always started. Make sure to leave it nil if we're NOT intending to start a copy phase (lastKnownPk is nil)

req.Vgtid.ShardGtids[0].TablePKs = []*binlogdata.TableLastPK{
{
TableName: table,
Lastpk: lastKnownPk,
},
}
}

return req
}

// positionEqual returns true if position `a` is equal to or after position `b`
Expand Down
Loading
Loading