-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add example for Flight SQL server that supports JDBC driver #5138
Conversation
@avantgardnerio may be interested in this |
I heavily referenced the Ballista implementation (and the arrow-rs Flight SQL example) so thank you @avantgardnerio! I linked to the Ballista implementation in the code comments in the example. |
ooh, foolish. this example creates a new SessionContext on every query. Moving to draft until I fix it. |
This is awesome -- thank you @kmitchener I also intend to pull something into the arrow-rs repo too -- (likely cribbing from this as well) |
let flight_data = batches_to_flight_data(schema, batches) | ||
.map_err(|e| status!("Could not convert batches", e))? | ||
.into_iter() | ||
.map(Ok); | ||
|
||
let stream: Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send>> = | ||
Box::pin(stream::iter(flight_data)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might be able to use the relatively newly added FlightDataEncoderBuilder
here to create the FlightData:
https://docs.rs/arrow-flight/32.0.0/arrow_flight/encode/struct.FlightDataEncoderBuilder.html
Pretty much like
// Get an input stream of Result<RecordBatch, FlightError>
let input_stream = futures::stream::iter(batches);
// Build a stream of `Result<FlightData>` (e.g. to return for do_get)
let flight_data_stream = FlightDataEncoderBuilder::new()
.with_schema(shcema)
.build(input_stream);
tonic::Response::new(flight_data_stream);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't able to figure out how to use this :/ -- the result of FlightDataEncoderBuilder.build() is not accepted by tonic::Response::new(). My experience with futures and streams is pretty limited, so maybe I'm missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll give it a shot later today -- thanks @kmitchener
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hit something like this recently. The difficult part is more the deeply nested type signatures than the async part IMO. I found some similar code and copy & paste solved it:
let output = futures::stream::iter(vec![Ok(PutResult {
app_metadata: Default::default(),
})]);
return Ok(Response::new(Box::pin(output)));
The type signature for that function turns into something like:
Result<Response<Pin<Box<dyn Stream<Item = Result<PutResult, Status>> + Send + 'static>>>, Status>
So a result of a pinned box holding an implementation of a Stream of Results of either PutResults or Statuses. Not exactly straight forward, I think it took me an hour to figure it out in the original code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kmitchener here is a PR to this branch kmitchener#1 that uses FlightDataEncder (I agree the stream nonsense is quite tricky when it comes to understanding what the compiler is telling you)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that works! and I think the other implementation was buggy as well, as it would sometimes not return expected rows. I was in the midst of troubleshooting that, but your changes here resolved it.
|
||
ctx.register_parquet( | ||
"nadt_pq", | ||
"/home/kmitchener/dev/convert/test-parquet/part-0.parquet", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like maybe it should be something less specific to your environment. Maybe it could be passed in as an environment variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy/paste error :) I put it back to referencing test data, thanks
let flight_data = batches_to_flight_data(schema, batches) | ||
.map_err(|e| status!("Could not convert batches", e))? | ||
.into_iter() | ||
.map(Ok); | ||
|
||
let stream: Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send>> = | ||
Box::pin(stream::iter(flight_data)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kmitchener here is a PR to this branch kmitchener#1 that uses FlightDataEncder (I agree the stream nonsense is quite tricky when it comes to understanding what the compiler is telling you)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other than the hard coded path this one looks ready to go
|
||
impl ProstMessageExt for FetchResults { | ||
fn type_url() -> &'static str { | ||
"type.googleapis.com/arrow.flight.protocol.sql.FetchResults" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend using something not in the arrow namespace
Maybe
"type.googleapis.com/arrow.flight.protocol.sql.FetchResults" | |
"type.googleapis.com/datatfusion.example.com.sql.FetchResults" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Thanks for the help @alamb! |
Thanks @kmitchener |
Benchmark runs are scheduled for baseline = 100665c and contender = 953d16b. 953d16b is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #5139 .
Rationale for this change
Adds an example for a standalone DataFusion server that can execute queries from tools like DBeaver or DataGrid via the JDBC Driver for Arrow Flight SQL.
What changes are included in this PR?
Just an example, and more dependencies in the datafusion-example module.
Are these changes tested?
Are there any user-facing changes?
No.