Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3 source with csv parser #31

Closed
wants to merge 4 commits into from
Closed

s3 source with csv parser #31

wants to merge 4 commits into from

Conversation

waruto210
Copy link

Support load data from Amazon S3 and other Simple Storage Services with CSV format.


A file is different from a message queue, where a file is read as a series of byte chunks, instead of separate messages.

The parser should try to parse one record from the payload, if the payload is not enough to parse one record, the parser will buffer the payload in its internal buffer and wait for more bytes to be passed. If any error occurs, the parser should clean its internal buffer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the "statefulness". In the future, I suppose the parser should be stateful, not only for buffering some input bytes, but also for storing some necessary metadata read from file's header (like ORC or Parquet files). Hopefully, this design can live up to those requirements.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, in this future, we should use composing parser inside reader to make it more flexible. In this cases, NexMark connectors can directly output DataChunk without the redundant serialization.

&'a mut self,
payload: &'a mut &'b [u8],
writer: SourceStreamChunkRowWriter<'c>,
) -> Self::ParseResult<'a>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By @hzxa21: May consider implementing it like a streaming operator to input streaming bytes and output streaming events. By the way, our streaming code adopts futures_async_stream to do this, which basically allows you to yield a DataChunk any time and the compiler & macro will handle other things like how to keep the states.

https://github.com/risingwavelabs/risingwave/blob/0128ff1b312ae22f51e979a73bd5cb9f61831c39/src/stream/src/executor/project_set.rs#L196

rename header

Co-authored-by: Eric Fu <[email protected]>
@neverchanje
Copy link

Isn't this feature implemented already? Will you finalize this document to reflect the up-to-date design?

@waruto210 waruto210 closed this Feb 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants