Skip to content

Commit

Permalink
correctly handle skip_parse
Browse files Browse the repository at this point in the history
  • Loading branch information
lmatz committed Dec 22, 2022
1 parent dc356ce commit 46de0cd
Show file tree
Hide file tree
Showing 14 changed files with 31 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/source/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl AvroParser {
};
// parse the valur to rw value
if let Value::Record(fields) = avro_value {
writer.insert(|column| {
writer.insert(|_idx, column| {
let tuple = fields.iter().find(|val| column.name.eq(&val.0)).unwrap();
from_avro_value(tuple.1.clone()).map_err(|e| {
tracing::error!(
Expand Down
6 changes: 3 additions & 3 deletions src/source/src/parser/canal/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl CanalJsonParser {

let results = inserted
.map(|v| {
writer.insert(|columns| {
writer.insert(|_idx, columns| {
canal_json_parse_value(&columns.data_type, v.get(&columns.name))
.map_err(Into::into)
})
Expand Down Expand Up @@ -106,7 +106,7 @@ impl CanalJsonParser {
let results = before
.zip(after)
.map(|(before, after)| {
writer.update(|column| {
writer.update(|_idx, column| {
// in origin canal, old only contains the changed columns but data
// contains all columns.
// in ticdc, old contains all fields
Expand All @@ -133,7 +133,7 @@ impl CanalJsonParser {
})?;
let results = deleted
.map(|v| {
writer.delete(|columns| {
writer.delete(|_idx, columns| {
canal_json_parse_value(&columns.data_type, v.get(&columns.name))
.map_err(Into::into)
})
Expand Down
6 changes: 3 additions & 3 deletions src/source/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl CanalJsonParser {
let results = inserted
.into_iter()
.map(|v| {
writer.insert(|column| {
writer.insert(|_idx, column| {
cannal_simd_json_parse_value(
&column.data_type,
v.get(column.name.as_str()),
Expand Down Expand Up @@ -117,7 +117,7 @@ impl CanalJsonParser {
let results = before
.zip_eq(after)
.map(|(before, after)| {
writer.update(|column| {
writer.update(|_idx, column| {
// in origin canal, old only contains the changed columns but data
// contains all columns.
// in ticdc, old contains all fields
Expand Down Expand Up @@ -151,7 +151,7 @@ impl CanalJsonParser {
let results = deleted
.into_iter()
.map(|v| {
writer.delete(|column| {
writer.delete(|_idx, column| {
cannal_simd_json_parse_value(
&column.data_type,
v.get(column.name.as_str()),
Expand Down
2 changes: 1 addition & 1 deletion src/source/src/parser/csv_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl CsvParser {
Some(strings) => strings,
};
writer
.insert(move |desc| {
.insert(move |_idx, desc| {
let column_id = desc.column_id.get_id();
let column_type = &desc.data_type;
let v = match columns_string.get(column_id as usize) {
Expand Down
6 changes: 3 additions & 3 deletions src/source/src/parser/debezium/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl DebeziumJsonParser {
))
})?;

writer.update(|column| {
writer.update(|_idx, column| {
let before = json_parse_value(&column.data_type, before.get(&column.name))?;
let after = json_parse_value(&column.data_type, after.get(&column.name))?;

Expand All @@ -79,7 +79,7 @@ impl DebeziumJsonParser {
))
})?;

writer.insert(|column| {
writer.insert(|_idx, column| {
json_parse_value(&column.data_type, after.get(&column.name)).map_err(Into::into)
})
}
Expand All @@ -90,7 +90,7 @@ impl DebeziumJsonParser {
))
})?;

writer.delete(|column| {
writer.delete(|_idx, column| {
json_parse_value(&column.data_type, before.get(&column.name))
.map_err(Into::into)
})
Expand Down
6 changes: 3 additions & 3 deletions src/source/src/parser/debezium/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl DebeziumJsonParser {
))
})?;

writer.update(|column| {
writer.update(|_idx, column| {
let before =
simd_json_parse_value(&column.data_type, before.get(column.name.as_str()))?;
let after =
Expand All @@ -98,7 +98,7 @@ impl DebeziumJsonParser {
))
})?;

writer.insert(|column| {
writer.insert(|_idx, column| {
simd_json_parse_value(&column.data_type, after.get(column.name.as_str()))
.map_err(Into::into)
})
Expand All @@ -113,7 +113,7 @@ impl DebeziumJsonParser {
))
})?;

writer.delete(|column| {
writer.delete(|_idx, column| {
simd_json_parse_value(&column.data_type, before.get(column.name.as_str()))
.map_err(Into::into)
})
Expand Down
2 changes: 1 addition & 1 deletion src/source/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl JsonParser {
let value: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload_mut)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;

writer.insert(|desc| {
writer.insert(|_idx, desc| {
simd_json_parse_value(&desc.data_type, value.get(desc.name.as_str())).map_err(|e| {
tracing::error!(
"failed to process value ({}): {}",
Expand Down
6 changes: 3 additions & 3 deletions src/source/src/parser/maxwell/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl MaxwellParser {
"data is missing for creating event".to_string(),
))
})?;
writer.insert(|column| {
writer.insert(|_idx, column| {
json_parse_value(&column.data_type, after.get(&column.name)).map_err(Into::into)
})
}
Expand All @@ -70,7 +70,7 @@ impl MaxwellParser {
))
})?;

writer.update(|column| {
writer.update(|_idx, column| {
// old only contains the changed columns but data contains all columns.
let before_value = before
.get(column.name.as_str())
Expand All @@ -84,7 +84,7 @@ impl MaxwellParser {
let before = event.after.as_ref().ok_or_else(|| {
RwError::from(ProtocolError("old is missing for delete event".to_string()))
})?;
writer.delete(|column| {
writer.delete(|_idx, column| {
json_parse_value(&column.data_type, before.get(&column.name))
.map_err(Into::into)
})
Expand Down
6 changes: 3 additions & 3 deletions src/source/src/parser/maxwell/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl MaxwellParser {
"data is missing for creating event".to_string(),
))
})?;
writer.insert(|column| {
writer.insert(|_idx, column| {
simd_json_parse_value(&column.data_type, after.get(column.name.as_str()))
.map_err(Into::into)
})
Expand All @@ -70,7 +70,7 @@ impl MaxwellParser {
))
})?;

writer.update(|column| {
writer.update(|_idx, column| {
// old only contains the changed columns but data contains all columns.
let before_value = before
.get(column.name.as_str())
Expand All @@ -85,7 +85,7 @@ impl MaxwellParser {
let before = event.get(AFTER).ok_or_else(|| {
RwError::from(ProtocolError("old is missing for delete event".to_string()))
})?;
writer.delete(|column| {
writer.delete(|_idx, column| {
simd_json_parse_value(&column.data_type, before.get(column.name.as_str()))
.map_err(Into::into)
})
Expand Down
10 changes: 5 additions & 5 deletions src/source/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl OpAction for OpActionUpdate {
impl SourceStreamChunkRowWriter<'_> {
fn do_action<A: OpAction>(
&mut self,
mut f: impl FnMut(&SourceColumnDesc) -> Result<A::Output>,
mut f: impl FnMut(usize, &SourceColumnDesc) -> Result<A::Output>,
) -> Result<WriteGuard> {
// The closure `f` may fail so that a part of builders were appended incompletely.
// Loop invariant: `builders[0..appended_idx)` has been appended on every iter ended or loop
Expand All @@ -204,7 +204,7 @@ impl SourceStreamChunkRowWriter<'_> {
let output = if desc.skip_parse {
A::DEFAULT_OUTPUT
} else {
f(desc)?
f(idx, desc)?
};
A::apply(builder, output);
appended_idx = idx + 1;
Expand All @@ -231,7 +231,7 @@ impl SourceStreamChunkRowWriter<'_> {
/// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`].
pub fn insert(
&mut self,
f: impl FnMut(&SourceColumnDesc) -> Result<Datum>,
f: impl FnMut(usize, &SourceColumnDesc) -> Result<Datum>,
) -> Result<WriteGuard> {
self.do_action::<OpActionInsert>(f)
}
Expand All @@ -244,7 +244,7 @@ impl SourceStreamChunkRowWriter<'_> {
/// * `f`: A failable closure that produced one [`Datum`] by corresponding [`SourceColumnDesc`].
pub fn delete(
&mut self,
f: impl FnMut(&SourceColumnDesc) -> Result<Datum>,
f: impl FnMut(usize, &SourceColumnDesc) -> Result<Datum>,
) -> Result<WriteGuard> {
self.do_action::<OpActionDelete>(f)
}
Expand All @@ -258,7 +258,7 @@ impl SourceStreamChunkRowWriter<'_> {
/// [`SourceColumnDesc`].
pub fn update(
&mut self,
f: impl FnMut(&SourceColumnDesc) -> Result<(Datum, Datum)>,
f: impl FnMut(usize, &SourceColumnDesc) -> Result<(Datum, Datum)>,
) -> Result<WriteGuard> {
self.do_action::<OpActionUpdate>(f)
}
Expand Down
4 changes: 1 addition & 3 deletions src/source/src/parser/native_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ impl NativeParser {
// Reclaim the ownership of the memory.
// Previously leak the memory in `DatagenEventGenerator`.
let boxed_row: Box<OwnedRow> = unsafe { Box::from_raw(payload.as_ptr() as *mut OwnedRow) };
let mut idx = 0;
writer.insert(|_desc| {
writer.insert(|idx, _desc| {
let datum = boxed_row.datum_at(idx).to_owned_datum();
idx += 1;
Ok(datum)
})
}
Expand Down
2 changes: 1 addition & 1 deletion src/source/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl ProtobufParser {

let message = DynamicMessage::decode(self.message_descriptor.clone(), payload)
.map_err(|e| ProtocolError(format!("parse message failed: {}", e)))?;
writer.insert(|column_desc| {
writer.insert(|_idx, column_desc| {
let field_desc = message
.descriptor()
.get_field_by_name(&column_desc.name)
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ mod tests {
let row_id_index = Some(ProstColumnIndex { index: 0 });
let pk_column_ids = vec![0];
let stream_source_info = StreamSourceInfo {
row_format: ProstRowFormatType::Json as i32,
row_format: ProstRowFormatType::Native as i32,
..Default::default()
};
let source_manager = Arc::new(TableSourceManager::default());
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/source/source_executor_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ mod tests {
let pk_column_ids = vec![0];
let pk_indices = vec![0];
let source_info = StreamSourceInfo {
row_format: ProstRowFormatType::Json as i32,
row_format: ProstRowFormatType::Native as i32,
..Default::default()
};
let (barrier_tx, barrier_rx) = unbounded_channel::<Barrier>();
Expand Down Expand Up @@ -602,7 +602,7 @@ mod tests {
let pk_column_ids = vec![0];
let pk_indices = vec![0_usize];
let source_info = StreamSourceInfo {
row_format: ProstRowFormatType::Json as i32,
row_format: ProstRowFormatType::Native as i32,
..Default::default()
};
let properties = convert_args!(hashmap!(
Expand Down

0 comments on commit 46de0cd

Please sign in to comment.