Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use VStream() #120

Merged
merged 31 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b1853c3
Adding VStream
notfelineit Feb 4, 2025
f885974
Split out
notfelineit Feb 4, 2025
7a6e7a4
Add vtgate service
notfelineit Feb 4, 2025
309f337
Update proto/vtgateservice.proto
notfelineit Feb 4, 2025
b2120e3
Update Makefile
notfelineit Feb 4, 2025
3f0fede
Run tidy
notfelineit Feb 4, 2025
21918b2
Merge branch 'use-vstream' of github.com:planetscale/airbyte-source i…
notfelineit Feb 4, 2025
c47c820
Try modifying getLatestCursorPosition to use vtgate
notfelineit Feb 4, 2025
d77e959
Reset protos
notfelineit Feb 4, 2025
06f5d6d
Get getLatestCursor working locally
notfelineit Feb 4, 2025
00a549f
Get local sync working
notfelineit Feb 4, 2025
772b9ae
Undo changes to makefile
notfelineit Feb 4, 2025
a873826
Refactor and reuse some logic, add helpful logs
notfelineit Feb 4, 2025
0baf971
Clean up logs more
notfelineit Feb 4, 2025
3c744d6
re run proto
notfelineit Feb 4, 2025
5b061ec
Update mock types and modify first few tests
notfelineit Feb 5, 2025
2238dcc
Convert more tests
notfelineit Feb 5, 2025
45f45d5
Handle cases of copy phase and first VGTID is past stop position, and…
notfelineit Feb 5, 2025
6c4e381
Remove some noisy logs
notfelineit Feb 5, 2025
4b8c976
Update test testing stopping at next VGtid after stop position
notfelineit Feb 5, 2025
375dbc8
Finish tests
notfelineit Feb 5, 2025
317637e
Remove unused mock types
notfelineit Feb 5, 2025
2c22667
Remove unused method
notfelineit Feb 5, 2025
3fffdc9
Simplify var names and logic
notfelineit Feb 6, 2025
0a6aed6
Clean up some more logs
notfelineit Feb 6, 2025
2f4559e
Handle case of no new VGTID found
notfelineit Feb 6, 2025
7026de9
Remove unused method
notfelineit Feb 10, 2025
cbf56ba
Remove extra spaces
notfelineit Feb 10, 2025
787467c
Address some PR comments
notfelineit Feb 10, 2025
02057d4
Clean up protos
notfelineit Feb 10, 2025
6689561
Add test for copy catchup
notfelineit Feb 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 42 additions & 15 deletions cmd/internal/mock_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package internal
import (
"context"
"database/sql"
psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1"
"google.golang.org/grpc"
"io"

"google.golang.org/grpc"
"vitess.io/vitess/go/vt/proto/vtgate"
vtgateservice "vitess.io/vitess/go/vt/proto/vtgateservice"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting that this alias is not needed since it matches the package name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! 😄

)

type testAirbyteLogger struct {
Expand Down Expand Up @@ -51,29 +53,54 @@ func (testAirbyteLogger) Error(error string) {
panic("implement me")
}

type clientConnectionMock struct {
syncFn func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error)
syncFnInvoked bool
syncFnInvokedCount int
type vstreamClientMock struct {
vstreamFn func(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (vtgateservice.Vitess_VStreamClient, error)
vstreamFnInvoked bool
vstreamFnInvokedCount int
}

type connectSyncClientMock struct {
type vtgateVStreamClientMock struct {
lastResponseSent int
syncResponses []*psdbconnect.SyncResponse
vstreamResponses []*vtgate.VStreamResponse
grpc.ClientStream
}

func (x *connectSyncClientMock) Recv() (*psdbconnect.SyncResponse, error) {
if x.lastResponseSent >= len(x.syncResponses) {
func (x *vtgateVStreamClientMock) Recv() (*vtgate.VStreamResponse, error) {
if x.lastResponseSent >= len(x.vstreamResponses) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little unclear to me what lastResponseSent is tracking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I am pretty sure last response sent keeps track of which index of mock sync responses should be sent next!

return nil, io.EOF
}
x.lastResponseSent += 1
return x.syncResponses[x.lastResponseSent-1], nil
return x.vstreamResponses[x.lastResponseSent-1], nil
}

func (x *vstreamClientMock) CloseSession(context.Context, *vtgate.CloseSessionRequest, ...grpc.CallOption) (*vtgate.CloseSessionResponse, error) {
return nil, nil
}

func (x *vstreamClientMock) Execute(context.Context, *vtgate.ExecuteRequest, ...grpc.CallOption) (*vtgate.ExecuteResponse, error) {
return nil, nil
}

func (x *vstreamClientMock) ExecuteBatch(context.Context, *vtgate.ExecuteBatchRequest, ...grpc.CallOption) (*vtgate.ExecuteBatchResponse, error) {
return nil, nil
}

func (x *vstreamClientMock) Prepare(context.Context, *vtgate.PrepareRequest, ...grpc.CallOption) (*vtgate.PrepareResponse, error) {
return nil, nil
}

func (x *vstreamClientMock) ResolveTransaction(context.Context, *vtgate.ResolveTransactionRequest, ...grpc.CallOption) (*vtgate.ResolveTransactionResponse, error) {
return nil, nil
}

func (x *vstreamClientMock) StreamExecute(context.Context, *vtgate.StreamExecuteRequest, ...grpc.CallOption) (vtgateservice.Vitess_StreamExecuteClient, error) {
return nil, nil
}
func (c *clientConnectionMock) Sync(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
c.syncFnInvoked = true
c.syncFnInvokedCount += 1
return c.syncFn(ctx, in, opts...)

func (c *vstreamClientMock) VStream(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (vtgateservice.Vitess_VStreamClient, error) {
c.vstreamFnInvoked = true
c.vstreamFnInvokedCount += 1
return c.vstreamFn(ctx, in, opts...)
}

type mysqlAccessMock struct {
Expand Down
Loading
Loading