Skip to content

Commit

Permalink
Merge branch 'main' into peng/meta-client
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jan 30, 2023
2 parents 834c826 + 5e3070d commit ec1b0ef
Show file tree
Hide file tree
Showing 40 changed files with 1,044 additions and 977 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 7 additions & 19 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::error::ErrorCode::{ConnectorError, ProtocolError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_connector::parser::SourceParserImpl;
use risingwave_connector::parser::SpecificParserConfig;
use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_connector::source::{
ConnectorProperties, SourceColumnDesc, SourceFormat, SourceInfo, SplitImpl, SplitMetaData,
Expand Down Expand Up @@ -68,7 +68,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
let config = ConnectorProperties::extract(source_props)
.map_err(|e| RwError::from(ConnectorError(e.into())))?;

let info = &source_node.get_info().unwrap();
let info = source_node.get_info().unwrap();
let format = match info.get_row_format()? {
RowFormatType::Json => SourceFormat::Json,
RowFormatType::Protobuf => SourceFormat::Protobuf,
Expand All @@ -83,19 +83,9 @@ impl BoxedExecutorBuilder for SourceExecutor {
"protobuf file location not provided".to_string(),
)));
}
let source_parser_rs = SourceParserImpl::create(
&format,
&source_node.properties,
info.row_schema_location.as_str(),
info.use_schema_registry,
info.proto_message_name.clone(),
)
.await;
let parser = if let Ok(source_parser) = source_parser_rs {
source_parser
} else {
return Err(source_parser_rs.err().unwrap());
};

let parser_config =
SpecificParserConfig::new(format, info, &source_node.properties).await?;

let columns: Vec<_> = source_node
.columns
Expand All @@ -106,7 +96,7 @@ impl BoxedExecutorBuilder for SourceExecutor {
let connector_source = ConnectorSource {
config,
columns,
parser,
parser_config,
connector_message_buffer_size: source
.context()
.get_config()
Expand Down Expand Up @@ -163,7 +153,7 @@ impl Executor for SourceExecutor {
impl SourceExecutor {
#[try_stream(ok = DataChunk, error = RwError)]
async fn do_execute(self: Box<Self>) {
let reader = self
let stream = self
.connector_source
.stream_reader(
Some(vec![self.split]),
Expand All @@ -173,8 +163,6 @@ impl SourceExecutor {
)
.await?;

let stream = reader.into_stream();

#[for_await]
for chunk in stream {
match chunk {
Expand Down
158 changes: 151 additions & 7 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,25 +105,26 @@ macro_rules! impl_split {
#[macro_export]
macro_rules! impl_split_reader {
($({ $variant_name:ident, $split_reader_name:ident} ),*) => {
impl SplitReaderImpl {
pub fn into_stream(self) -> BoxSourceStream {
impl SplitReaderV2Impl {
pub fn into_stream(self) -> BoxSourceWithStateStream {
match self {
$( Self::$variant_name(inner) => $crate::source::SplitReader::into_stream(*inner), )*
}
$( Self::$variant_name(inner) => inner.into_stream(), )* }
}

pub async fn create(
config: ConnectorProperties,
state: ConnectorState,
parser_config: ParserConfig,
metrics: Arc<SourceMetrics>,
source_info: SourceInfo,
columns: Option<Vec<Column>>,
) -> Result<Self> {
if state.is_none() {
return Ok(Self::Dummy(Box::new(DummySplitReader {})));
}

let splits = state.unwrap();
let connector = match config {
$( ConnectorProperties::$variant_name(props) => Self::$variant_name(Box::new(<$split_reader_name as $crate::source::SplitReader>::new(*props, state, columns).await?)), )*
_ => todo!()
$( ConnectorProperties::$variant_name(props) => Self::$variant_name(Box::new($split_reader_name::new(*props, splits, parser_config, metrics, source_info, columns).await?)), )*
};

Ok(connector)
Expand Down Expand Up @@ -158,3 +159,146 @@ macro_rules! impl_connector_properties {
}
}
}

#[macro_export]
macro_rules! impl_common_parser_logic {
($parser_name:ty) => {
impl $parser_name {
#[try_stream(boxed, ok = $crate::source::StreamChunkWithState, error = RwError)]
async fn into_chunk_stream(self, data_stream: $crate::source::BoxSourceStream) {
#[for_await]
for batch in data_stream {
let batch = batch?;
let mut builder =
$crate::parser::SourceStreamChunkBuilder::with_capacity(self.rw_columns.clone(), batch.len());
let mut split_offset_mapping: std::collections::HashMap<$crate::source::SplitId, String> = std::collections::HashMap::new();

for msg in batch {
if let Some(content) = msg.payload {
split_offset_mapping.insert(msg.split_id, msg.offset);

let old_op_num = builder.op_num();

if let Err(e) = self.parse_inner(content.as_ref(), builder.row_writer())
.await
{
tracing::warn!("message parsing failed {}, skipping", e.to_string());
continue;
}

let new_op_num = builder.op_num();

// new_op_num - old_op_num is the number of rows added to the builder
for _ in old_op_num..new_op_num {
// TODO: support more kinds of SourceMeta
if let $crate::source::SourceMeta::Kafka(kafka_meta) = msg.meta.clone() {
let f = |desc: &SourceColumnDesc| -> Option<risingwave_common::types::Datum> {
if !desc.is_meta {
return None;
}
match desc.name.as_str() {
"_rw_kafka_timestamp" => Some(
kafka_meta
.timestamp
.map(|ts| risingwave_expr::vector_op::cast::i64_to_timestamptz(ts).unwrap().into()),
),
_ => unreachable!(
"kafka will not have this meta column: {}",
desc.name
),
}
};
builder.row_writer().fulfill_meta_column(f)?;
}
}
}
}
yield $crate::source::StreamChunkWithState {
chunk: builder.finish(),
split_offset_mapping: Some(split_offset_mapping),
};
}
}
}

impl $crate::parser::ByteStreamSourceParser for $parser_name {
fn into_stream(self, data_stream: $crate::source::BoxSourceStream) -> $crate::source::BoxSourceWithStateStream {
self.into_chunk_stream(data_stream)
}
}

}
}

#[macro_export]
macro_rules! impl_common_split_reader_logic {
($reader:ty, $props:ty) => {
impl $reader {
#[try_stream(boxed, ok = $crate::source::StreamChunkWithState, error = risingwave_common::error::RwError)]
pub(crate) async fn into_msg_stream(self) {
let parser_config = self.parser_config.clone();
let actor_id = self.source_info.actor_id.to_string();
let source_id = self.source_info.source_id.to_string();
let split_id = self.split_id.clone();
let metrics = self.metrics.clone();

let data_stream = self.into_data_stream();

let data_stream = data_stream
.map_ok(move |data_batch| {
metrics
.partition_input_count
.with_label_values(&[&actor_id, &source_id, &split_id])
.inc_by(data_batch.len() as u64);
let sum_bytes = data_batch
.iter()
.map(|msg| match &msg.payload {
None => 0,
Some(payload) => payload.len() as u64,
})
.sum();
metrics
.partition_input_bytes
.with_label_values(&[&actor_id, &source_id, &split_id])
.inc_by(sum_bytes);
data_batch
})
.boxed();
let parser =
$crate::parser::ByteStreamSourceParserImpl::create(parser_config)?;
#[for_await]
for msg_batch in parser.into_stream(data_stream) {
yield msg_batch?;
}
}
}

#[async_trait]
impl $crate::source::SplitReaderV2 for $reader {
type Properties = $props;

async fn new(
properties: $props,
state: Vec<SplitImpl>,
parser_config: ParserConfig,
metrics: Arc<SourceMetrics>,
source_info: SourceInfo,
columns: Option<Vec<Column>>,
) -> Result<Self> {
Self::new(
properties,
state,
parser_config,
metrics,
source_info,
columns,
)
.await
}

fn into_stream(self) -> $crate::source::BoxSourceWithStateStream {
self.into_msg_stream()
}
}
};
}
Loading

0 comments on commit ec1b0ef

Please sign in to comment.