From a5a0d3a3c532bcdb2e46adb435338a28dd4a85e4 Mon Sep 17 00:00:00 2001 From: lmatz Date: Thu, 22 Dec 2022 17:04:05 +0800 Subject: [PATCH] correctly handle skip_parse --- src/source/src/parser/avro/parser.rs | 2 +- src/source/src/parser/canal/json_parser.rs | 6 +++--- src/source/src/parser/canal/simd_json_parser.rs | 6 +++--- src/source/src/parser/csv_parser.rs | 2 +- src/source/src/parser/debezium/json_parser.rs | 6 +++--- src/source/src/parser/debezium/simd_json_parser.rs | 6 +++--- src/source/src/parser/json_parser.rs | 2 +- src/source/src/parser/maxwell/json_parser.rs | 6 +++--- src/source/src/parser/maxwell/simd_json_parser.rs | 6 +++--- src/source/src/parser/mod.rs | 10 +++++----- src/source/src/parser/native_parser.rs | 4 +--- src/source/src/parser/protobuf/parser.rs | 2 +- src/stream/src/executor/source/source_executor.rs | 2 +- src/stream/src/executor/source/source_executor_v2.rs | 4 ++-- 14 files changed, 31 insertions(+), 33 deletions(-) diff --git a/src/source/src/parser/avro/parser.rs b/src/source/src/parser/avro/parser.rs index ebbcdcce25cca..1f6770a4e64b2 100644 --- a/src/source/src/parser/avro/parser.rs +++ b/src/source/src/parser/avro/parser.rs @@ -205,7 +205,7 @@ impl AvroParser { }; // parse the valur to rw value if let Value::Record(fields) = avro_value { - writer.insert(|column| { + writer.insert(|_idx, column| { let tuple = fields.iter().find(|val| column.name.eq(&val.0)).unwrap(); from_avro_value(tuple.1.clone()).map_err(|e| { tracing::error!( diff --git a/src/source/src/parser/canal/json_parser.rs b/src/source/src/parser/canal/json_parser.rs index 09a74b50f297a..12c7d4d612dfc 100644 --- a/src/source/src/parser/canal/json_parser.rs +++ b/src/source/src/parser/canal/json_parser.rs @@ -76,7 +76,7 @@ impl CanalJsonParser { let results = inserted .map(|v| { - writer.insert(|columns| { + writer.insert(|_idx, columns| { canal_json_parse_value(&columns.data_type, v.get(&columns.name)) .map_err(Into::into) }) @@ -106,7 +106,7 @@ impl CanalJsonParser { let results = before .zip(after) .map(|(before, after)| { - writer.update(|column| { + writer.update(|_idx, column| { // in origin canal, old only contains the changed columns but data // contains all columns. // in ticdc, old contains all fields @@ -133,7 +133,7 @@ impl CanalJsonParser { })?; let results = deleted .map(|v| { - writer.delete(|columns| { + writer.delete(|_idx, columns| { canal_json_parse_value(&columns.data_type, v.get(&columns.name)) .map_err(Into::into) }) diff --git a/src/source/src/parser/canal/simd_json_parser.rs b/src/source/src/parser/canal/simd_json_parser.rs index d8c461e450342..4a9fdb446b7da 100644 --- a/src/source/src/parser/canal/simd_json_parser.rs +++ b/src/source/src/parser/canal/simd_json_parser.rs @@ -79,7 +79,7 @@ impl CanalJsonParser { let results = inserted .into_iter() .map(|v| { - writer.insert(|column| { + writer.insert(|_idx, column| { cannal_simd_json_parse_value( &column.data_type, v.get(column.name.as_str()), @@ -117,7 +117,7 @@ impl CanalJsonParser { let results = before .zip_eq(after) .map(|(before, after)| { - writer.update(|column| { + writer.update(|_idx, column| { // in origin canal, old only contains the changed columns but data // contains all columns. // in ticdc, old contains all fields @@ -151,7 +151,7 @@ impl CanalJsonParser { let results = deleted .into_iter() .map(|v| { - writer.delete(|column| { + writer.delete(|_idx, column| { cannal_simd_json_parse_value( &column.data_type, v.get(column.name.as_str()), diff --git a/src/source/src/parser/csv_parser.rs b/src/source/src/parser/csv_parser.rs index 33bc8b836f64d..f970b016cc363 100644 --- a/src/source/src/parser/csv_parser.rs +++ b/src/source/src/parser/csv_parser.rs @@ -130,7 +130,7 @@ impl CsvParser { Some(strings) => strings, }; writer - .insert(move |desc| { + .insert(move |_idx, desc| { let column_id = desc.column_id.get_id(); let column_type = &desc.data_type; let v = match columns_string.get(column_id as usize) { diff --git a/src/source/src/parser/debezium/json_parser.rs b/src/source/src/parser/debezium/json_parser.rs index 8fcdaf1bc03dd..42896215034ba 100644 --- a/src/source/src/parser/debezium/json_parser.rs +++ b/src/source/src/parser/debezium/json_parser.rs @@ -65,7 +65,7 @@ impl DebeziumJsonParser { )) })?; - writer.update(|column| { + writer.update(|_idx, column| { let before = json_parse_value(&column.data_type, before.get(&column.name))?; let after = json_parse_value(&column.data_type, after.get(&column.name))?; @@ -79,7 +79,7 @@ impl DebeziumJsonParser { )) })?; - writer.insert(|column| { + writer.insert(|_idx, column| { json_parse_value(&column.data_type, after.get(&column.name)).map_err(Into::into) }) } @@ -90,7 +90,7 @@ impl DebeziumJsonParser { )) })?; - writer.delete(|column| { + writer.delete(|_idx, column| { json_parse_value(&column.data_type, before.get(&column.name)) .map_err(Into::into) }) diff --git a/src/source/src/parser/debezium/simd_json_parser.rs b/src/source/src/parser/debezium/simd_json_parser.rs index ed6318d4a1997..a2f533dd94bf8 100644 --- a/src/source/src/parser/debezium/simd_json_parser.rs +++ b/src/source/src/parser/debezium/simd_json_parser.rs @@ -79,7 +79,7 @@ impl DebeziumJsonParser { )) })?; - writer.update(|column| { + writer.update(|_idx, column| { let before = simd_json_parse_value(&column.data_type, before.get(column.name.as_str()))?; let after = @@ -98,7 +98,7 @@ impl DebeziumJsonParser { )) })?; - writer.insert(|column| { + writer.insert(|_idx, column| { simd_json_parse_value(&column.data_type, after.get(column.name.as_str())) .map_err(Into::into) }) @@ -113,7 +113,7 @@ impl DebeziumJsonParser { )) })?; - writer.delete(|column| { + writer.delete(|_idx, column| { simd_json_parse_value(&column.data_type, before.get(column.name.as_str())) .map_err(Into::into) }) diff --git a/src/source/src/parser/json_parser.rs b/src/source/src/parser/json_parser.rs index e671a57c9f295..bbc8b4705e850 100644 --- a/src/source/src/parser/json_parser.rs +++ b/src/source/src/parser/json_parser.rs @@ -97,7 +97,7 @@ impl JsonParser { let value: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload_mut) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; - writer.insert(|desc| { + writer.insert(|_idx, desc| { simd_json_parse_value(&desc.data_type, value.get(desc.name.as_str())).map_err(|e| { tracing::error!( "failed to process value ({}): {}", diff --git a/src/source/src/parser/maxwell/json_parser.rs b/src/source/src/parser/maxwell/json_parser.rs index 3b5f9f07c9caf..b2c55c95fb30a 100644 --- a/src/source/src/parser/maxwell/json_parser.rs +++ b/src/source/src/parser/maxwell/json_parser.rs @@ -54,7 +54,7 @@ impl MaxwellParser { "data is missing for creating event".to_string(), )) })?; - writer.insert(|column| { + writer.insert(|_idx, column| { json_parse_value(&column.data_type, after.get(&column.name)).map_err(Into::into) }) } @@ -70,7 +70,7 @@ impl MaxwellParser { )) })?; - writer.update(|column| { + writer.update(|_idx, column| { // old only contains the changed columns but data contains all columns. let before_value = before .get(column.name.as_str()) @@ -84,7 +84,7 @@ impl MaxwellParser { let before = event.after.as_ref().ok_or_else(|| { RwError::from(ProtocolError("old is missing for delete event".to_string())) })?; - writer.delete(|column| { + writer.delete(|_idx, column| { json_parse_value(&column.data_type, before.get(&column.name)) .map_err(Into::into) }) diff --git a/src/source/src/parser/maxwell/simd_json_parser.rs b/src/source/src/parser/maxwell/simd_json_parser.rs index 445fe5da8d96f..84074c77ec0d8 100644 --- a/src/source/src/parser/maxwell/simd_json_parser.rs +++ b/src/source/src/parser/maxwell/simd_json_parser.rs @@ -53,7 +53,7 @@ impl MaxwellParser { "data is missing for creating event".to_string(), )) })?; - writer.insert(|column| { + writer.insert(|_idx, column| { simd_json_parse_value(&column.data_type, after.get(column.name.as_str())) .map_err(Into::into) }) @@ -70,7 +70,7 @@ impl MaxwellParser { )) })?; - writer.update(|column| { + writer.update(|_idx, column| { // old only contains the changed columns but data contains all columns. let before_value = before .get(column.name.as_str()) @@ -85,7 +85,7 @@ impl MaxwellParser { let before = event.get(AFTER).ok_or_else(|| { RwError::from(ProtocolError("old is missing for delete event".to_string())) })?; - writer.delete(|column| { + writer.delete(|_idx, column| { simd_json_parse_value(&column.data_type, before.get(column.name.as_str())) .map_err(Into::into) }) diff --git a/src/source/src/parser/mod.rs b/src/source/src/parser/mod.rs index 1f2779caed806..6735dfc48a4ef 100644 --- a/src/source/src/parser/mod.rs +++ b/src/source/src/parser/mod.rs @@ -189,7 +189,7 @@ impl OpAction for OpActionUpdate { impl SourceStreamChunkRowWriter<'_> { fn do_action( &mut self, - mut f: impl FnMut(&SourceColumnDesc) -> Result, + mut f: impl FnMut(usize, &SourceColumnDesc) -> Result, ) -> Result { // The closure `f` may fail so that a part of builders were appended incompletely. // Loop invariant: `builders[0..appended_idx)` has been appended on every iter ended or loop @@ -204,7 +204,7 @@ impl SourceStreamChunkRowWriter<'_> { let output = if desc.skip_parse { A::DEFAULT_OUTPUT } else { - f(desc)? + f(idx, desc)? }; A::apply(builder, output); appended_idx = idx + 1; @@ -231,7 +231,7 @@ impl SourceStreamChunkRowWriter<'_> { /// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`]. pub fn insert( &mut self, - f: impl FnMut(&SourceColumnDesc) -> Result, + f: impl FnMut(usize, &SourceColumnDesc) -> Result, ) -> Result { self.do_action::(f) } @@ -244,7 +244,7 @@ impl SourceStreamChunkRowWriter<'_> { /// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`]. pub fn delete( &mut self, - f: impl FnMut(&SourceColumnDesc) -> Result, + f: impl FnMut(usize, &SourceColumnDesc) -> Result, ) -> Result { self.do_action::(f) } @@ -258,7 +258,7 @@ impl SourceStreamChunkRowWriter<'_> { /// [`SourceColumnDesc`]. pub fn update( &mut self, - f: impl FnMut(&SourceColumnDesc) -> Result<(Datum, Datum)>, + f: impl FnMut(usize, &SourceColumnDesc) -> Result<(Datum, Datum)>, ) -> Result { self.do_action::(f) } diff --git a/src/source/src/parser/native_parser.rs b/src/source/src/parser/native_parser.rs index 1aa7d81ffcad2..bcb8def470659 100644 --- a/src/source/src/parser/native_parser.rs +++ b/src/source/src/parser/native_parser.rs @@ -31,10 +31,8 @@ impl NativeParser { // Reclaim the ownership of the memory. // Previously leak the memory in `DatagenEventGenerator`. let boxed_row: Box = unsafe { Box::from_raw(payload.as_ptr() as *mut OwnedRow) }; - let mut idx = 0; - writer.insert(|_desc| { + writer.insert(|idx, _desc| { let datum = boxed_row.datum_at(idx).to_owned_datum(); - idx += 1; Ok(datum) }) } diff --git a/src/source/src/parser/protobuf/parser.rs b/src/source/src/parser/protobuf/parser.rs index a624b252c5dcf..331074d1f7ef3 100644 --- a/src/source/src/parser/protobuf/parser.rs +++ b/src/source/src/parser/protobuf/parser.rs @@ -166,7 +166,7 @@ impl ProtobufParser { let message = DynamicMessage::decode(self.message_descriptor.clone(), payload) .map_err(|e| ProtocolError(format!("parse message failed: {}", e)))?; - writer.insert(|column_desc| { + writer.insert(|_idx, column_desc| { let field_desc = message .descriptor() .get_field_by_name(&column_desc.name) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index d394199c10d4f..8a413e35c307d 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -781,7 +781,7 @@ mod tests { let row_id_index = Some(ProstColumnIndex { index: 0 }); let pk_column_ids = vec![0]; let stream_source_info = StreamSourceInfo { - row_format: ProstRowFormatType::Json as i32, + row_format: ProstRowFormatType::Native as i32, ..Default::default() }; let source_manager = Arc::new(TableSourceManager::default()); diff --git a/src/stream/src/executor/source/source_executor_v2.rs b/src/stream/src/executor/source/source_executor_v2.rs index 77496f90f289c..26ac6029c2372 100644 --- a/src/stream/src/executor/source/source_executor_v2.rs +++ b/src/stream/src/executor/source/source_executor_v2.rs @@ -511,7 +511,7 @@ mod tests { let pk_column_ids = vec![0]; let pk_indices = vec![0]; let source_info = StreamSourceInfo { - row_format: ProstRowFormatType::Json as i32, + row_format: ProstRowFormatType::Native as i32, ..Default::default() }; let (barrier_tx, barrier_rx) = unbounded_channel::(); @@ -602,7 +602,7 @@ mod tests { let pk_column_ids = vec![0]; let pk_indices = vec![0_usize]; let source_info = StreamSourceInfo { - row_format: ProstRowFormatType::Json as i32, + row_format: ProstRowFormatType::Native as i32, ..Default::default() }; let properties = convert_args!(hashmap!(