Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New gcp_big_query sink #1536

Open
6 tasks
binarylogic opened this issue Jan 17, 2020 · 23 comments
Open
6 tasks

New gcp_big_query sink #1536

binarylogic opened this issue Jan 17, 2020 · 23 comments
Labels
have: should We should have this feature, but is not required. It is medium priority. needs: approval Needs review & approval before work can begin. provider: gcp Anything `gcp` service provider related sink: new A request for a new sink type: feature A value-adding code addition that introduce new functionality.

Comments

@binarylogic
Copy link
Contributor

binarylogic commented Jan 17, 2020

GCP Big Query is a powerful service for analyzing large amounts of structured data. If used correctly, it can be a cost-effective storage for log data. I would like to see Vector support this service as a sink, but it'll require careful planning due to the many different ways Big Query can be used.

Context

Big Query is flexible and we should consider the following features for our Big Query sink:

  1. Storing JSON data in a single column. Users can use Big Query's JSON functions.
    1. This type of querying is slower for obvious reasons (fetching more data, parsing, etc).
    2. This type of querying is more expensive because each query must fetch and scan the entire JSON payload as opposed to individual columns.
  2. Mapping structured data to individual Big Query table columns.
  3. Automatic schema maintenance.
  4. Streaming inserts vs batching inserts.
    1. Streaming inserts have a cost ($0.010 per 200 MB).
    2. Note the variety of methods to batch insert. This can be done through the API directly or through other GCP services (cloud storage, stackdriver, etc).

This, of course, is not inclusive of all factors we should consider for Big Query, but it helps to demonstrate the variety of options.

Starting simple

v1 of this sink should solve the simplest implementation:

  • Use the streaming inserts API and stream records 1 by 1 (no batching).
  • Assume we are writing to a table with 2 columns: timestamp and json_event.
    • The timestamp column should map to our own internal timestamp column.
    • The json_event column should contain a JSON encoded representation of our event.
    • Both of these column names should be configurable (defaulting to the global log_schema.timestamp_key). It is worth thinking about a generic column mapping configuration scheme so that users could map other custom fields to Big Query columns.
  • Include documentation on how to create a properly structured table. Ideally this table would be partitioned by the timestamp day.

Long Term

We might consider the following features for long-term development:

  1. Support for using the batching API since it does not incur a cost.
  2. Dynamic schema maintenance. Although, I think this might be better solved with a transform or something separate.
@binarylogic binarylogic added type: new feature needs: approval Needs review & approval before work can begin. labels Jan 17, 2020
@binarylogic binarylogic added this to the Initial GCP support milestone Jan 17, 2020
@bruceg
Copy link
Member

bruceg commented Feb 27, 2020

I have this done and minimally tested against GCP, but I have a few questions.

We now have transforms that can alter the schema of the log records. Should this sink be writing the logs into the table as-is, or still follow the rewriting described in the initial issue (using a JSON encoded column)?

How important is being able to configure the column names within the BigQuery sink itself?

@binarylogic
Copy link
Contributor Author

Good questions, I have a lot of thoughts around this, but I didn't want to distract from a simple first version. Here are a couple of ways this can work today:

  1. Encode the event as JSON into a single column.
  2. Front the sink with a coercer transform with drop_unspecified enabled and ensure the Big Query table columns match exactly. New schema transform #165 can serve this purpose as well.

Long term I would like to improve the UX by:

  1. Treating the Big Query table schema as the source of truth and using that schema to encode events. One idea is to reactively refresh this schema when we receive specific schema related insert errors.
  2. Treating another schema source (registry) as the source of truth, such as a JSON schema file. This would pair nicely with Investigate file descriptor use with tokio 0.2 #1695.
  3. And finally, treating the event schema as the source of truth and dynamically changing the Big Query table schema as fields are discovered. This requires quite a bit more discussion and it's likely we won't do this.

@Hoverbear Hoverbear linked a pull request Mar 10, 2020 that will close this issue
@binarylogic binarylogic added type: feature A value-adding code addition that introduce new functionality. and removed type: new feature labels Jun 16, 2020
@binarylogic binarylogic removed this from the Initial GCP support milestone Jul 26, 2020
@binarylogic binarylogic added have: should We should have this feature, but is not required. It is medium priority. provider: gcp Anything `gcp` service provider related labels Aug 7, 2020
@jszwedko
Copy link
Member

Just noting that a user asked for this in gitter today: https://gitter.im/timberio-vector/community?at=5f3d3d31582470633b670c43

@seeyarh
Copy link
Contributor

seeyarh commented Jul 21, 2021

Any update on this? I prototyped a simple implementation here https://github.com/seeyarh/tobq/blob/main/src/lib.rs
It uses the following library: https://github.com/lquerel/gcp-bigquery-client

I'd like to work on this sink, if possible.

@jszwedko
Copy link
Member

@seeyarh nice! That seems like a good start. This issue hasn't been scheduled yet, but we are happy to review a PR to Vector if you wanted to try implementing it.

@gartemiev
Copy link

@jszwedko @seeyarh @binarylogic , any news on this? We are actively migrating our DW to Google Bigquery and would like to have this feature.

@jszwedko
Copy link
Member

Hi @gartemiev ! Nothing yet unfortunately.

@JoHuang
Copy link

JoHuang commented Mar 24, 2022

https://cloud.google.com/blog/products/data-analytics/bigquery-now-natively-supports-semi-structured-data
With this support, maybe we can simplify schema and treat data as a field of JSON type.

@jszwedko jszwedko added the sink: new A request for a new sink label Aug 1, 2022
@gartemiev
Copy link

@jszwedko @seeyarh @binarylogic

are there any news on this when it will be approximately available?

@spencergilbert
Copy link
Contributor

are there any news on this when it will be approximately available?

It's currently not on our roadmap, but we'd be open to discussing a community contribution if anyone was interested in working on that.

@goakley
Copy link
Contributor

goakley commented Aug 14, 2023

@jszwedko @spencergilbert I would like to pick this up, as we have also stumbled into a need to write directly to BigQuery. I get the sense that this would be similar to other gcp and HttpSinks since we can leverage the tabledata.insertAll API call. There is a more "featureful" gRPC API for writing to tables, but that would definitely be a heavier lift given that there is no official Rust gRPC client for GCP. Any thoughts on this? Happy to propose an implementation in a PR if that's easier.

@neuronull
Copy link
Contributor

@jszwedko @spencergilbert I would like to pick this up, as we have also stumbled into a need to write directly to BigQuery. I get the sense that this would be similar to other gcp and HttpSinks since we can leverage the tabledata.insertAll API call. There is a more "featureful" gRPC API for writing to tables, but that would definitely be a heavier lift given that there is no official Rust gRPC client for GCP. Any thoughts on this? Happy to propose an implementation in a PR if that's easier.

Hi @goakley , thanks for letting us know you're interested in working on this.

I think right now its OK to chat about the approach first, before implementing. We also have a formal process for evaluating new proposed components that we may draw some questions from and ask you. There has been good demand for this component already, which is good.

Last December I took a look at this, and the main concern is that google seems to really be advocating new development towards the Storage Write API (gRPC based). It definitely would be a heavier lift that is for sure. I think the trade off worth evaluating is if we get some nice performance improvements with the gRPC based API, and if support is dropped for the legacy API, that could imply a rewrite to the gRPC based API down the line.

@goakley
Copy link
Contributor

goakley commented Aug 29, 2023

Thank you for the feedback @neuronull. Using the gRPC interface requires a more complicated setup, but I believe it can be done as follow:

In addition to the "usual" sink settings (batch size, credentials, etc), users will be required to specify the project, dataset, and table name for the table in which they want to store their data, plus a representation of the table schema, which is necessary for calling the AppendRows method (i.e. we can't write arbitrary blobs of data). I believe the representation of the schema in the config file should be protobuf, as it's (a) the representation by which you can define the schema in BigQuery itself and (b) easy for us to parse in Rust and convert to the data types used by the gRPC API. We could support either a large protobuf string in the config file, or point to another .proto file from the config file. I'm not sure what's preferable here. Maybe supporting both?

By default, Vector will write to the table's default stream. In the future, we could extend the sink, adding a new config setting that allows writing to an application-created stream. I would prefer to keep the initial implementation simple and not add application-created streams at the start. Google themselves recommend using the default stream when possible, however they do not support exactly-once write semantics which some use cases might require. In any case, Vector will not manage streams - streams should be created independently and then provided to Vector via the config file.

Otherwise, the flow here is predictable. Data reaches the sink, gets transformed into the correct structure as specified by the config's protobuf, and is streamed to bigquery with the usual backoff/retry mechanics that vector uses in other sinks.

@neuronull
Copy link
Contributor

neuronull commented Sep 1, 2023

Hey @goakley , thanks for the follow up. Ah yes, needing to supply the schema. I remember that now.

What you outlined makes sense to me, and I agree to keep it simple to the default stream to begin (though perhaps wiring it up with in mind that the default stream may not always be the one that is used).

We could support either a large protobuf string in the config file, or point to another .proto file from the config file. I'm not sure what's preferable here. Maybe supporting both?

I would opt for the file path approach to keep the vector config cleaner. We recently had a contribution of a protobuf codec, that does this for desc_file. That seems like it would be a useful pattern to re-use here. (see #18019)

@goakley
Copy link
Contributor

goakley commented Sep 18, 2023

Thanks for the nudge in the right direction @neuronull. I've split out the protobuf-specific serialization logic into its own PR: #18598 Once that's merged, I will follow up with a proposed BigQuery sink implementation.

@goakley
Copy link
Contributor

goakley commented Oct 4, 2023

Hey @neuronull, I've been reading through the vector code and trying some stuff out. Of particular interest in the fact that BigQuery's AppendRows call is (1) a streaming service that (2) returns row-specific errors, and (3) the "new" StreamSink API in the Vector codebase.

(1) By calling AppendRows, tonic establishes a long-lived connection over which a chunk of rows can be sent, after which a response will be streamed back. afaik all Vector sinks are tower::Services (as enforced by the Driver contract), which are based around a one-request-one-response model. I can certainly wrap the AppendRows logic to make the stream look like a request-response interface, but I wanted to make sure I'm not missing some Vector functionality that would allow for a more flexible streaming-based sink model.

(2) AppendRows responses will indicate if individual rows are the cause of a write failure. When the problem is with individual rows, no rows from the request are written, but the request can be tried again with the bad rows omitted, and that followup request is expected to succeed. This sort of behaviour doesn't seem to fit the Vector model, which assumes a single request (batch of events) will either succeed or fail in full. Again, I can finagle some logic to retry requests the hood of the tower::Service, but I wanted to see if I missed finding some backpressure/reprocessing logic in the Driver itself.

(3) The StreamSink API is quite powerful thanks to SinkBuilderExt. I've been referencing the datadog/traces logic as an example for BigQuery, which uses that sink type. Is this the way to build sinks going forward, or are other methods still supported? I've run into some absolutely bonkers rust errors while prototyping this out ("higher-ranked lifetime error" anyone?), so I wanted to make sure this is the right path before I go deep.

@neuronull
Copy link
Contributor

👋 Hey @goakley , those are good questions to be asking. Sorry I didn't get to respond earlier. I will get back to you on this on Monday.

@neuronull
Copy link
Contributor

neuronull commented Oct 16, 2023

(1) I can certainly wrap the AppendRows logic to make the stream look like a request-response interface, but I wanted to make sure I'm not missing some Vector functionality that would allow for a more flexible streaming-based sink model.

Indeed most of the sinks we have are on that request-response model. An example of one that differs from that is the websocket sink. We don't really have the same level of scaffolding/interfaces setup for this model as we do the request-response. Another potentially helpful reference might be the vector sink, where we use tonic as well, to connect to a downstream Vector instance, and call push_events.

(2) but the request can be tried again with the bad rows omitted, and that followup request is expected to succeed. This sort of behaviour doesn't seem to fit the Vector model, which assumes a single request (batch of events) will either succeed or fail in full. Again, I can finagle some logic to retry requests the hood of the tower::Service, but I wanted to see if I missed finding some backpressure/reprocessing logic in the Driver itself.

We do have some infrastructure here for retrying requests. A RetryLogic implementation can be passed into a ServiceBuilder. Doing so allows the ability to implement is_retriable_error(), in which the Response could be inspected to see if its the type of write failure due to individual rows.

See

pub trait RetryLogic: Clone + Send + Sync + 'static {
type Error: std::error::Error + Send + Sync + 'static;
type Response;
fn is_retriable_error(&self, error: &Self::Error) -> bool;
fn should_retry_response(&self, _response: &Self::Response) -> RetryAction {
// Treat the default as the request is successful
RetryAction::Successful
}
}
and
let service = ServiceBuilder::new()
.settings(request_limits, S3RetryLogic)
.service(service);
.

(3) Is this the way to build sinks going forward, or are other methods still supported?

Yes the StreamSink variant is the only acceptable path forward. We are slowly making progress on converting all the "old style" sinks to the new style (#9261).

ICYMI there is a detailed tutorial for new style sinks here: https://github.com/vectordotdev/vector/tree/master/docs/tutorials/sinks. It does center on the request-response based model but I think it does a good job at explaining the infrastructure that is used.

I've run into some absolutely bonkers rust errors while prototyping this out ("higher-ranked lifetime error" anyone?)

😅 There are definitely some obscure compilation errors that can crop up, and they can be tricky sometimes to track down. If you get stuck, you're welcome to share your branch and I can take a look at the error(s). I have done that in the past for others.

@goakley
Copy link
Contributor

goakley commented Oct 19, 2023

Thank you @neuronull, that is helpful! I've tried to keep things relatively simple in this initial PR, which does function as expected: #18886 Adding RetryLogic would probably be a good next step here, depending on the feedback. At a high level, the websocket and vector sinks seem kind of straightforward, but they don't quite match or combine to exactly how the BigQuery API works - I've implemented a less-optimal way of writing to BigQuery in exchange for greater sink simplicity, which may or may not be the right call.

(Oh, and the higher-ranked lifetime error was because vector_common::Error is a boxed dyn that I buried deep in nested futures)

@neuronull
Copy link
Contributor

Awesome! I'll queue that PR up for review~

@neuronull
Copy link
Contributor

👋 I haven't given that a deep review yet but I looked deep enough to see the manifestation of this

I've implemented a less-optimal way of writing to BigQuery in exchange for greater sink simplicity, which may or may not be the right call.

, and that is the main thing I think needs deciding on before moving forward with diving deeper into the review.

To fully utilize the gRPC Stream service model that the Storage Write API has, would mean going against the unary request/response mode the Vector stream sink driver framework relies on a bit (which we've touched on a bit already).

I think it's still possible to do it though. I do see your point about it being a tradeoff with complexity.

My bad for pointing to the Vector sink which doesn't utilize the Stream service model. A better reference point would probably actually be the gcp_pubsub source. That does leverage the gRPC Stream service model. As you can/will see, the code is definitely more involved. It means setting up an async tokio runtime and managing things in there. Of course, the pubsub source is not a sink so it doesn't have to fit into the constrains of the StreamSink requirements, but that is where the websocket sink kind of comes into play.

I am curious about the performance of the current design is. It's one thing to have perhaps a more robust design with leveraging the Stream service model, but it would add to the argument for it if the performance of the current model was noticeably poor.

@goakley
Copy link
Contributor

goakley commented Jan 5, 2024

@neuronull I'm not sure what performance we're looking for, but my team is currently using this branch in production (don't tell on us) to push an average of 3.3k events per second to BigQuery from a single gcp_bigquery sink in a single container. (We're running 32 of them, for a throughput of 108k/sec.) We aren't seeing any dropped messages or buffers filling up from this. As expected, there's no real CPU impact since this is all in the networking.

@neuronull
Copy link
Contributor

I'm glad to hear it is working well for your use case, thank you for sharing that (🙈)

Raised this with the team and we have a proposal for you @goakley:

We are working on a formalized process for community-driven components in Vector. But in the interim, would you / your company be willing to informally volunteer ownership of the component in it's current state? That essentially implies that aside from routine maintenance required, the Vector team would be "hands-off" on this component, relying on you and your team for bug fixes reported by the community, etc. For an example of that you can see the history of the appsignal sink.

In the future, we may want to adopt this into a "core" vector component that we would re-assume ownership of, at which time we could further investigate the stream service approach that we've been discussing in this thread.

If this is agreeable to you/your company, we will dive into your PR for in-depth code review. How does that sound?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
have: should We should have this feature, but is not required. It is medium priority. needs: approval Needs review & approval before work can begin. provider: gcp Anything `gcp` service provider related sink: new A request for a new sink type: feature A value-adding code addition that introduce new functionality.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants