Skip to content

Commit

Permalink
Try modifying getLatestCursorPosition to use vtgate
Browse files Browse the repository at this point in the history
  • Loading branch information
notfelineit committed Feb 4, 2025
1 parent 21918b2 commit c47c820
Showing 1 changed file with 40 additions and 21 deletions.
61 changes: 40 additions & 21 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ import (

"github.com/pkg/errors"
psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1"
"github.com/planetscale/airbyte-source/proto/vtgateservice"
"github.com/planetscale/psdb/auth"
grpcclient "github.com/planetscale/psdb/core/pool"
clientoptions "github.com/planetscale/psdb/core/pool/options"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
vtmysql "vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/proto/topodata"
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
)
Expand All @@ -35,9 +39,10 @@ type PlanetScaleDatabase interface {
// It uses the mysql interface provided by PlanetScale for all schema/shard/tablet discovery and
// the grpc API for incrementally syncing rows from PlanetScale.
type PlanetScaleEdgeDatabase struct {
Logger AirbyteLogger
Mysql PlanetScaleEdgeMysqlAccess
clientFn func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error)
Logger AirbyteLogger
Mysql PlanetScaleEdgeMysqlAccess
clientFn func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error)
vtgateClientFn func(ctx context.Context, ps PlanetScaleSource) (vtgateservice.VitessClient, error)
}

func (p PlanetScaleEdgeDatabase) CanConnect(ctx context.Context, psc PlanetScaleSource) error {
Expand Down Expand Up @@ -337,11 +342,11 @@ func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, sh
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
var (
err error
client psdbconnect.ConnectClient
err error
vtgateClient vtgateservice.VitessClient
)

if p.clientFn == nil {
if p.vtgateClientFn == nil {
conn, err := grpcclient.Dial(ctx, ps.Host,
clientoptions.WithDefaultTLSConfig(),
clientoptions.WithCompression(true),
Expand All @@ -354,38 +359,52 @@ func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, sh
return "", err
}
defer conn.Close()
client = psdbconnect.NewConnectClient(conn)
vtgateClient = vtgateservice.NewVitessClient(conn)
} else {
client, err = p.clientFn(ctx, ps)
vtgateClient, err = p.vtgateClientFn(ctx, ps)
if err != nil {
return "", err
}
}

sReq := &psdbconnect.SyncRequest{
TableName: s.Name,
Cursor: &psdbconnect.TableCursor{
Shard: shard,
Keyspace: keyspace,
Position: "current",
vtgateReq := &vtgateservice.VStreamRequest{
TabletType: topodata.TabletType(tabletType),
Vgtid: &binlogdata.VGtid{
ShardGtids: []*binlogdata.ShardGtid{
{
Shard: shard,
Keyspace: keyspace,
Gtid: "current",
},
},
},
Flags: &vtgateservice.VStreamFlags{
MinimizeSkew: true,
Cells: "planetscale_operator_default",
},
Filter: &binlogdata.Filter{
Rules: []*binlogdata.Rule{{
Match: s.Name,
Filter: "SELECT * FROM " + sqlescape.EscapeID(s.Name),
}},
},
TabletType: tabletType,
Cells: []string{"planetscale_operator_default"},
}

c, err := client.Sync(ctx, sReq)
if err != nil {
vtgateCursor, vtgateErr := vtgateClient.VStream(ctx, vtgateReq)

if vtgateErr != nil {
return "", nil
}

for {
res, err := c.Recv()
res, err := vtgateCursor.Recv()
if err != nil {
return "", err
}

if res.Cursor != nil {
return res.Cursor.Position, nil
if res.Events != nil {
lastEvent := res.Events[len(res.Events)-1]
return lastEvent.Gtid, nil
}
}
}
Expand Down

0 comments on commit c47c820

Please sign in to comment.