Skip to content

Commit

Permalink
resolve conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Dec 8, 2022
2 parents 84831a3 + 9a3b8da commit 62a68a5
Show file tree
Hide file tree
Showing 38 changed files with 231 additions and 233 deletions.
4 changes: 0 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/scripts/build-other.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set -euo pipefail
source ci/scripts/common.env.sh

# Should set a stable version of connector node
STABLE_VERSION=e31eb0bf6e4f708ceadce846538fc6bd55978c59
STABLE_VERSION=7d454801e478e86c50a1e94cc139842554a0470d

echo "--- Build Java connector node"
git clone https://"$GITHUB_TOKEN"@github.com/risingwavelabs/risingwave-connector-node.git
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ done
shift $((OPTIND -1))

echo "--- Rust cargo-sort check"
cargo sort -c -w
cargo sort --check --workspace

echo "--- Rust cargo-hakari check"
cargo hakari verify
cargo hakari generate --diff

echo "--- Rust format check"
cargo fmt --all -- --check
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ mysql --host=mysql --port=3306 -u root -p123456 < ./e2e_test/source/cdc/mysql_cd
# start risingwave cluster
cargo make ci-start ci-1cn-1fe-with-recovery
# start cdc connector node
nohup java -jar ./connector-service.jar --port 60061 > .risingwave/log/connector-source.log 2>&1 &
nohup java -jar ./connector-service.jar --port 60061 > .risingwave/log/connector-node.log 2>&1 &
sleep 1
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.load.slt'
# wait for cdc loading
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 5
timeout_in_minutes: 8
retry: *auto-retry

- label: "unit and fuzz test"
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ mod default {
}

pub fn in_flight_barrier_nums() -> usize {
40
10
}

pub fn checkpoint_frequency() -> usize {
Expand Down
4 changes: 4 additions & 0 deletions src/config/ci-recovery.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
disable_recovery = false
enable_committed_sst_sanity_check = true
max_heartbeat_interval_secs = 600

[streaming]
barrier_interval_ms = 250
checkpoint_frequency = 5
4 changes: 4 additions & 0 deletions src/config/ci.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
disable_recovery = true
enable_committed_sst_sanity_check = true
max_heartbeat_interval_secs = 600

[streaming]
barrier_interval_ms = 250
checkpoint_frequency = 5
4 changes: 2 additions & 2 deletions src/config/risingwave.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ connection_pool_size = 16
[batch]

[streaming]
barrier_interval_ms = 250
in_flight_barrier_nums = 40
barrier_interval_ms = 1000
in_flight_barrier_nums = 10
checkpoint_frequency = 10

[storage]
Expand Down
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repository = { workspace = true }
anyhow = "1"
async-stream = "0.3"
async-trait = "0.1"
aws-config = { workspace = true }
aws-config = { workspace = true }
aws-sdk-kinesis = { workspace = true }
aws-sdk-s3 = { workspace = true }
aws-sdk-sqs = { workspace = true }
Expand Down
27 changes: 20 additions & 7 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use rdkafka::message::ToBytes;
use rdkafka::producer::{BaseRecord, DefaultProducerContext, Producer, ThreadedProducer};
use rdkafka::types::RDKafkaErrorCode;
use rdkafka::ClientConfig;
use risingwave_common::array::{ArrayResult, Op, RowRef, StreamChunk};
use risingwave_common::array::{ArrayError, ArrayResult, Op, RowRef, StreamChunk};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::to_text::ToText;
use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl};
use serde::Deserialize;
use serde_json::{json, Map, Value};
Expand Down Expand Up @@ -247,8 +248,6 @@ impl Sink for KafkaSink {
// &self.in_transaction_epoch.unwrap()) && in_txn_epoch <= epoch { return Ok(())
// }

println!("sink chunk {:?}", chunk);

match self.config.format.as_str() {
"append_only" => self.append_only(chunk, &self.schema).await,
"debezium" => {
Expand Down Expand Up @@ -318,6 +317,8 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult<Value

let data_type = field.data_type();

tracing::info!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);

let value = match (data_type, scalar_ref) {
(DataType::Boolean, ScalarRefImpl::Bool(v)) => {
json!(v)
Expand All @@ -342,10 +343,18 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult<Value
}
(DataType::Decimal, ScalarRefImpl::Decimal(v)) => {
// fixme
json!(v.to_string())
json!(v.to_text())
}
(DataType::Time, ScalarRefImpl::NaiveTime(_v)) => {
unimplemented!()
(
dt @ DataType::Date
| dt @ DataType::Time
| dt @ DataType::Timestamp
| dt @ DataType::Timestampz
| dt @ DataType::Interval
| dt @ DataType::Bytea,
scalar,
) => {
json!(scalar.to_text_with_type(&dt))
}
(DataType::List { .. }, ScalarRefImpl::List(list_ref)) => {
let mut vec = Vec::with_capacity(field.sub_fields.len());
Expand All @@ -371,7 +380,11 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult<Value
}
json!(map)
}
_ => unimplemented!(),
_ => {
return Err(ArrayError::internal(
"datum_to_json_object: unsupported data type".to_string(),
));
}
};

Ok(value)
Expand Down
13 changes: 13 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,19 @@ mod tests {
Ok(())
}

#[test]
fn test_cdc_split_state() -> Result<()> {
let offset_str = "{\"sourcePartition\":{\"server\":\"RW_CDC_mydb.products\"},\"sourceOffset\":{\"transaction_id\":null,\"ts_sec\":1670407377,\"file\":\"binlog.000001\",\"pos\":98587,\"row\":2,\"server_id\":1,\"event\":2}}";
let split_impl = SplitImpl::Cdc(CdcSplit::new(1001, offset_str.to_string()));
let encoded_split = split_impl.encode_to_bytes();
let restored_split_impl = SplitImpl::restore_from_bytes(encoded_split.as_ref())?;
assert_eq!(
split_impl.encode_to_bytes(),
restored_split_impl.encode_to_bytes()
);
Ok(())
}

#[test]
fn test_extract_nexmark_config() {
let props: HashMap<String, String> = convert_args!(hashmap!(
Expand Down
10 changes: 0 additions & 10 deletions src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@ use async_trait::async_trait;
use crate::source::cdc::{CdcProperties, CdcSplit};
use crate::source::SplitEnumerator;

pub const MYSQL_CDC_PREFIX: &str = "RW_CDC_";

#[derive(Debug)]
pub struct DebeziumSplitEnumerator {
/// The source_id in the catalog
source_id: u32,
/// Debezium will assign a partition identifier for each table
partition: String,
}

#[async_trait]
Expand All @@ -33,21 +29,15 @@ impl SplitEnumerator for DebeziumSplitEnumerator {
type Split = CdcSplit;

async fn new(props: CdcProperties) -> anyhow::Result<DebeziumSplitEnumerator> {
let partition = format!(
"{}{}.{}",
MYSQL_CDC_PREFIX, props.database_name, props.table_name
);
Ok(Self {
source_id: props.source_id,
partition,
})
}

async fn list_splits(&mut self) -> anyhow::Result<Vec<CdcSplit>> {
// CDC source only supports single split
let splits = vec![CdcSplit {
source_id: self.source_id,
partition: self.partition.clone(),
start_offset: None,
}];
Ok(splits)
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::source::{BoxSourceStream, Column, ConnectorState, SplitImpl};

pub struct CdcSplitReader {
source_id: u64,
start_offset: Option<String>,
props: CdcProperties,
}

Expand All @@ -50,6 +51,7 @@ impl SplitReader for CdcSplitReader {
if let SplitImpl::Cdc(cdc_split) = split {
return Ok(Self {
source_id: cdc_split.source_id as u64,
start_offset: cdc_split.start_offset,
props,
});
}
Expand Down Expand Up @@ -79,7 +81,7 @@ impl CdcSplitReader {
database_name: props.database_name.clone(),
table_name: props.table_name.clone(),
partition: props.parititon.clone(),
start_offset: props.start_offset.clone(),
start_offset: self.start_offset.unwrap_or_default(),
include_schema_events: false,
},
)
Expand Down
9 changes: 3 additions & 6 deletions src/connector/src/source/cdc/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ use serde::{Deserialize, Serialize};
use crate::source::{SplitId, SplitMetaData};

/// The states of a CDC split, which will be persisted to checkpoint.
/// The offset will be updated when received a new chunk, see `StreamChunkWithState`.
/// CDC source only has single split
/// CDC source only has single split, so we use the `source_id` to identify the split.
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
pub struct CdcSplit {
pub source_id: u32,
pub partition: String,
pub start_offset: Option<String>,
}

Expand All @@ -43,15 +41,14 @@ impl SplitMetaData for CdcSplit {
}

impl CdcSplit {
pub fn new(source_id: u32, partition: String, start_offset: String) -> CdcSplit {
pub fn new(source_id: u32, start_offset: String) -> CdcSplit {
Self {
source_id,
partition,
start_offset: Some(start_offset),
}
}

pub fn copy_with_offset(&self, start_offset: String) -> Self {
Self::new(self.source_id, self.partition.clone(), start_offset)
Self::new(self.source_id, start_offset)
}
}
11 changes: 7 additions & 4 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ mod resolve_id;

use std::collections::BTreeMap;
use std::path::Path;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use anyhow::{anyhow, bail, Result};
Expand All @@ -35,7 +34,7 @@ use risingwave_frontend::{
build_graph, explain_stream_graph, Binder, FrontendOpts, OptimizerContext, OptimizerContextRef,
PlanRef, Planner, WithOptions,
};
use risingwave_sqlparser::ast::{ObjectName, Statement};
use risingwave_sqlparser::ast::{ExplainOptions, ObjectName, Statement};
use risingwave_sqlparser::parser::Parser;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -307,12 +306,16 @@ impl TestCase {
if result.is_some() {
panic!("two queries in one test case");
}
let explain_options = ExplainOptions {
verbose: true,
..Default::default()
};
let context = OptimizerContext::new(
session.clone(),
Arc::from(sql),
WithOptions::try_from(&stmt)?,
explain_options,
);
context.explain_verbose.store(true, Ordering::Relaxed); // use explain verbose in planner tests
let ret = self.apply_query(&stmt, context.into())?;
if do_check_result {
check_result(self, &ret)?;
Expand Down Expand Up @@ -413,7 +416,7 @@ impl TestCase {
stmt: &Statement,
context: OptimizerContextRef,
) -> Result<TestCaseResult> {
let session = context.inner().session_ctx.clone();
let session = context.session_ctx().clone();
let mut ret = TestCaseResult::default();

let bound = {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"stages": {
"0": {
"root": {
"plan_node_id": 23,
"plan_node_id": 24,
"plan_node_type": "BatchProject",
"schema": [
{
Expand All @@ -59,7 +59,7 @@
],
"children": [
{
"plan_node_id": 21,
"plan_node_id": 22,
"plan_node_type": "BatchValues",
"schema": [],
"children": [],
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub fn gen_sink_plan(
// If colume names not specified, use the name in materialized view.
let col_names = get_column_names(&bound, session, stmt.columns)?;

let properties = context.inner().with_options.clone();
let properties = context.with_options().clone();

let mut plan_root = Planner::new(context).plan_query(bound)?;
let col_names = if let Some(col_names) = col_names {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub(crate) fn gen_create_table_plan(
bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?;
let row_id_index = row_id_index.map(|index| ProstColumnIndex { index: index as _ });
let pk_column_ids = pk_column_ids.into_iter().map(Into::into).collect();
let properties = context.inner().with_options.inner().clone();
let properties = context.with_options().inner().clone();

// TODO(Yuanxin): Detect if there is an external source based on `properties` (WITH CONNECTOR)
// and make prost source accordingly.
Expand Down
Loading

0 comments on commit 62a68a5

Please sign in to comment.