Skip to content

Commit

Permalink
feat: Initial support for kafka timestamp pushdown. (#7150)
Browse files Browse the repository at this point in the history
Support limiting kafka query range through condition like `_rw_kafka_timestamp > '2022.-01-01 00:00:00+00:00'`.

Approved-By: ZENOTME
Approved-By: tabVersion
  • Loading branch information
liurenjie1024 authored Jan 3, 2023
1 parent d2ef784 commit 1406718
Show file tree
Hide file tree
Showing 11 changed files with 342 additions and 48 deletions.
12 changes: 12 additions & 0 deletions e2e_test/source/basic/kafka_batch.slt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ select * from s1
3 333
4 4444

query IT rowsort
select * from s1 where _rw_kafka_timestamp > '1977-01-01 00:00:00+00:00'
----
1 1
2 22
3 333
4 4444

query IT
select * from s1 where _rw_kafka_timestamp > '2045-01-01 0:00:00+00:00'
----

query IT rowsort
select * from s1 limit 2
----
Expand Down
60 changes: 40 additions & 20 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ impl KafkaSplitEnumerator {
} else {
None
};

// println!("Start offset: {:?}", expect_start_offset);
let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
Some(
self.fetch_offset_for_time(topic_partitions.as_ref(), ts)
Expand All @@ -147,6 +149,7 @@ impl KafkaSplitEnumerator {
} else {
None
};
// println!("Stop offset: {:?}", expect_stop_offset);

// Watermark here has nothing to do with watermark in streaming processing. Watermark
// here means smallest/largest offset available for reading.
Expand All @@ -161,28 +164,45 @@ impl KafkaSplitEnumerator {
}
ret
};
// println!("Watermark: {:?}", watermarks);

topic_partitions.iter().map(|partition| {
let (low, high) = watermarks.remove(&partition).unwrap();
let start_offset = {
let start = expect_start_offset.as_mut().map(|m| m.remove(partition).unwrap_or(Some(low))).unwrap_or(Some(low)).unwrap_or(low);
i64::max(start, low)
};
let stop_offset = {
let stop = expect_stop_offset.as_mut().map(|m| m.remove(partition).unwrap_or(Some(high))).unwrap_or(Some(high)).unwrap_or(high);
i64::min(stop, high)
};

if start_offset > stop_offset {
return Err(anyhow!(format!("topic {} partition {}: requested start offset {} is greater than stop offset {}",self.topic, partition, start_offset, stop_offset)));
}
Ok(KafkaSplit {
topic: self.topic.clone(),
partition: *partition,
start_offset: Some(start_offset),
stop_offset: Some(stop_offset),
Ok(topic_partitions
.iter()
.map(|partition| {
let (low, high) = watermarks.remove(&partition).unwrap();
let start_offset = {
let start = expect_start_offset
.as_mut()
.map(|m| m.remove(partition).flatten().map(|t| t-1).unwrap_or(low))
.unwrap_or(low);
i64::max(start, low)
};
let stop_offset = {
let stop = expect_stop_offset
.as_mut()
.map(|m| m.remove(partition).unwrap_or(Some(high)))
.unwrap_or(Some(high))
.unwrap_or(high);
i64::min(stop, high)
};

if start_offset > stop_offset {
tracing::warn!(
"Skipping topic {} partition {}: requested start offset {} is greater than stop offset {}",
self.topic,
partition,
start_offset,
stop_offset
);
}
KafkaSplit {
topic: self.topic.clone(),
partition: *partition,
start_offset: Some(start_offset),
stop_offset: Some(stop_offset),
}
})
}).collect::<anyhow::Result<Vec<KafkaSplit>>>()
.collect::<Vec<KafkaSplit>>())
}

async fn fetch_stop_offset(
Expand Down
15 changes: 12 additions & 3 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::source::{BoxSourceStream, Column, ConnectorState, SplitImpl};

pub struct KafkaSplitReader {
consumer: StreamConsumer<DefaultConsumerContext>,
start_offset: Option<i64>,
stop_offset: Option<i64>,
bytes_per_second: usize,
}
Expand Down Expand Up @@ -73,6 +74,7 @@ impl SplitReader for KafkaSplitReader {
.await
.context("failed to create kafka consumer")?;

let mut start_offset = None;
let mut stop_offset = None;
if let Some(splits) = state {
assert_eq!(splits.len(), 1);
Expand All @@ -81,6 +83,7 @@ impl SplitReader for KafkaSplitReader {
for split in &splits {
if let SplitImpl::Kafka(k) = split {
if let Some(offset) = k.start_offset {
start_offset = Some(offset);
tpl.add_partition_offset(
k.topic.as_str(),
k.partition,
Expand All @@ -105,6 +108,7 @@ impl SplitReader for KafkaSplitReader {

Ok(Self {
consumer,
start_offset,
stop_offset,
bytes_per_second,
})
Expand All @@ -118,9 +122,14 @@ impl SplitReader for KafkaSplitReader {
impl KafkaSplitReader {
#[try_stream(boxed, ok = Vec<SourceMessage>, error = anyhow::Error)]
pub async fn into_stream(self) {
if let Some(stop_offset) = self.stop_offset && stop_offset == 0{
yield Vec::new();
return Ok(());
if let Some(stop_offset) = self.stop_offset {
if let Some(start_offset) = self.start_offset && (start_offset+1) >= stop_offset {
yield Vec::new();
return Ok(());
} else if stop_offset == 0 {
yield Vec::new();
return Ok(());
}
}
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.tick().await;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/batch_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
select * from s
logical_plan: |
LogicalProject { exprs: [id, value] }
└─LogicalSource { source: s, columns: [_row_id, id, value] }
└─LogicalSource { source: s, columns: [_row_id, id, value], time_range: [(Unbounded, Unbounded)] }
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [id, value] }
└─BatchSource { source: "s", columns: ["_row_id", "id", "value"] }
└─BatchSource { source: "s", columns: ["_row_id", "id", "value"], filter: (None, None) }
create_source:
row_format: protobuf
name: s
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/planner_test/tests/testdata/share.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
| └─BatchFilter { predicate: (initial_bid = 1:Int32) }
| └─BatchProject { exprs: [id, initial_bid] }
| └─BatchFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) }
| └─BatchSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"] }
| └─BatchSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"], filter: (None, None) }
└─BatchExchange { order: [], dist: HashShard(id) }
└─BatchProject { exprs: [id] }
└─BatchFilter { predicate: (initial_bid = 2:Int32) }
└─BatchProject { exprs: [id, initial_bid] }
└─BatchFilter { predicate: ((initial_bid = 1:Int32) OR (initial_bid = 2:Int32)) }
└─BatchSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"] }
└─BatchSource { source: "auction", columns: ["id", "item_name", "description", "initial_bid", "reserve", "date_time", "expires", "seller", "category", "_row_id"], filter: (None, None) }
stream_plan: |
StreamMaterialize { columns: [cnt], pk_columns: [] }
└─StreamProject { exprs: [sum0(count)] }
Expand Down Expand Up @@ -105,15 +105,15 @@
| └─BatchHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start] }
| └─BatchExchange { order: [], dist: HashShard(auction) }
| └─BatchProject { exprs: [auction, date_time] }
| └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"] }
| └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"], filter: (None, None) }
└─BatchProject { exprs: [max(count), window_start] }
└─BatchHashAgg { group_key: [window_start], aggs: [max(count)] }
└─BatchExchange { order: [], dist: HashShard(window_start) }
└─BatchHashAgg { group_key: [auction, window_start], aggs: [count] }
└─BatchHopWindow { time_col: date_time, slide: 00:00:02, size: 00:00:10, output: [auction, window_start] }
└─BatchExchange { order: [], dist: HashShard(auction) }
└─BatchProject { exprs: [auction, date_time] }
└─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"] }
└─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "date_time", "_row_id"], filter: (None, None) }
stream_plan: |
StreamMaterialize { columns: [auction, num, window_start(hidden), window_start#1(hidden)], pk_columns: [window_start, auction, window_start#1] }
└─StreamProject { exprs: [auction, count, window_start, window_start] }
Expand Down
38 changes: 30 additions & 8 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ use std::collections::HashMap;

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::error::ErrorCode::{self, ProtocolError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_pb::catalog::{
ColumnIndex as ProstColumnIndex, Source as ProstSource, StreamSourceInfo,
};
Expand All @@ -29,7 +31,9 @@ use super::create_table::{bind_sql_columns, bind_sql_table_constraints, gen_mate
use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::column_catalog::ColumnCatalog;
use crate::catalog::ColumnId;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::KAFKA_TIMESTAMP_COLUMN_NAME;
use crate::optimizer::OptimizerContext;
use crate::stream_fragmenter::build_graph;

Expand Down Expand Up @@ -83,23 +87,40 @@ pub async fn handle_create_source(
is_materialized: bool,
stmt: CreateSourceStatement,
) -> Result<RwPgResponse> {
let (column_descs, pk_column_id_from_columns) = bind_sql_columns(stmt.columns)?;
let with_properties = handler_args.with_options.inner().clone();

// confluent schema registry must be used with kafka
let is_kafka = with_properties
.get("connector")
.unwrap_or(&"".to_string())
.to_lowercase()
.eq("kafka");

let (mut column_descs, pk_column_id_from_columns) = bind_sql_columns(stmt.columns)?;

// Add hidden column `_rw_kafka_timestamp` to each message
if is_kafka && !is_materialized {
let kafka_timestamp_column = ColumnDesc {
data_type: DataType::Timestamptz,
column_id: ColumnId::new(column_descs.len() as i32),
name: KAFKA_TIMESTAMP_COLUMN_NAME.to_string(),
field_descs: vec![],
type_name: "".to_string(),
};
column_descs.push(kafka_timestamp_column);
}

let (mut columns, pk_column_ids, row_id_index) =
bind_sql_table_constraints(column_descs, pk_column_id_from_columns, stmt.constraints)?;

if row_id_index.is_none() && !is_materialized {
return Err(ErrorCode::InvalidInputSyntax(
"The non-materialized source does not support PRIMARY KEY constraint, please use \"CREATE MATERIALIZED SOURCE\" instead".to_owned(),
)
.into());
}
let with_properties = handler_args.with_options.inner().clone();

const UPSTREAM_SOURCE_KEY: &str = "connector";
// confluent schema registry must be used with kafka
let is_kafka = with_properties
.get("connector")
.unwrap_or(&"".to_string())
.to_lowercase()
.eq("kafka");
if !is_kafka
&& matches!(
&stmt.source_schema,
Expand All @@ -117,6 +138,7 @@ pub async fn handle_create_source(
UPSTREAM_SOURCE_KEY
))));
}

let (columns, source_info) = match &stmt.source_schema {
SourceSchema::Protobuf(protobuf_schema) => {
if columns.len() != 1 || pk_column_ids != vec![0.into()] || row_id_index != Some(0) {
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,11 @@ pub fn bind_sql_table_constraints(
let mut columns_catalog = column_descs
.into_iter()
.map(|c| {
// All columns except `_row_id` or starts with `_rw` should be visible.
let is_hidden = c.name.starts_with("_rw");
ColumnCatalog {
column_desc: c,
// All columns except `_row_id` should be visible.
is_hidden: false,
is_hidden,
}
})
.collect_vec();
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl BatchSource {
Distribution::Single,
Order::any(),
);

Self { base, logical }
}

Expand Down Expand Up @@ -71,6 +72,7 @@ impl fmt::Display for BatchSource {
builder
.field("source", &self.logical.source_catalog().unwrap().name)
.field("columns", &self.column_names())
.field("filter", &self.logical.kafka_timestamp_range_value())
.finish()
}
}
Expand Down
Loading

0 comments on commit 1406718

Please sign in to comment.