Skip to content

Commit

Permalink
renaming thresholds (#756)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenziliang authored Jun 2, 2024
1 parent 9e80aa3 commit e795216
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 35 deletions.
6 changes: 3 additions & 3 deletions src/Interpreters/Streaming/DDLHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ const std::vector<String> CREATE_TABLE_SETTINGS = {
"logstore_flush_ms",
"logstore_replication_factor",
"distributed_ingest_mode",
"distributed_flush_threshold_ms",
"distributed_flush_threshold_count",
"distributed_flush_threshold_bytes",
"flush_threshold_ms",
"flush_threshold_count",
"flush_threshold_bytes",
"storage_type",
"logstore",
"mode",
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/MergeTree/MergeTreeSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ struct Settings;
M(Int64, logstore_request_timeout_ms, 30000, "Time out value for an ingest request to the backend write-ahead log", 0) \
M(Int64, logstore_flush_messages, 1000, "Tell streaming storage to call fsync per flush messages", 0) \
M(Int64, logstore_flush_ms, 120000, "Tell streaming storage to call fsync every flush_ms interval", 0) \
M(Int64, distributed_flush_threshold_ms, 2000, "Time threshold for streaming storage to flush consumed data from write-ahead log", 0) \
M(Int64, distributed_flush_threshold_count, 100000, "Row count threshold for streaming storage to flush consumed data from write-ahead log", 0) \
M(Int64, distributed_flush_threshold_bytes, 10 * 1024 * 1024, "Data size threshold for streaming storage to flush consumed data from write-ahead log", 0) \
M(Int64, flush_threshold_ms, 2000, "Time threshold for streaming storage to flush consumed data from write-ahead log", 0) \
M(Int64, flush_threshold_count, 100000, "Row count threshold for streaming storage to flush consumed data from write-ahead log", 0) \
M(Int64, flush_threshold_bytes, 10 * 1024 * 1024, "Data size threshold for streaming storage to flush consumed data from write-ahead log", 0) \
// End of STREAM_SETTINGS

#define CONFIGURABLE_STREAM_SETTINGS(M) \
Expand Down
28 changes: 13 additions & 15 deletions src/Storages/Streaming/StreamShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,13 @@ void StreamShard::backgroundPollNativeLog()

LOG_INFO(
log,
"Start consuming records from shard={} sn={} distributed_flush_threshold_ms={} "
"distributed_flush_threshold_count={} "
"distributed_flush_threshold_bytes={} with missing_sequence_ranges={}",
"Start consuming records from shard={} sn={} flush_threshold_ms={} flush_threshold_count={} "
"flush_threshold_bytes={} with missing_sequence_ranges={}",
shard,
snLoaded(),
ssettings->distributed_flush_threshold_ms.value,
ssettings->distributed_flush_threshold_count.value,
ssettings->distributed_flush_threshold_bytes.value,
ssettings->flush_threshold_ms.value,
ssettings->flush_threshold_count.value,
ssettings->flush_threshold_bytes.value,
sequenceRangesToString(missing_sequence_ranges));

StreamCallbackData stream_commit{this, missing_sequence_ranges};
Expand All @@ -227,9 +226,9 @@ void StreamShard::backgroundPollNativeLog()
size_t batch_size = 100;
batch.reserve(batch_size);

size_t rows_threshold = ssettings->distributed_flush_threshold_count.value;
size_t bytes_threshold = ssettings->distributed_flush_threshold_bytes.value;
Int64 interval_threshold = ssettings->distributed_flush_threshold_ms.value;
size_t rows_threshold = ssettings->flush_threshold_count.value;
size_t bytes_threshold = ssettings->flush_threshold_bytes.value;
Int64 interval_threshold = ssettings->flush_threshold_ms.value;

size_t current_bytes_in_batch = 0;
size_t current_rows_in_batch = 0;
Expand Down Expand Up @@ -302,9 +301,8 @@ void StreamShard::backgroundPollKafka()

LOG_INFO(
log,
"Start consuming records from shard={} sn={} distributed_flush_threshold_ms={} "
"distributed_flush_threshold_count={} "
"distributed_flush_threshold_bytes={} with missing_sequence_ranges={}",
"Start consuming records from shard={} sn={} flush_threshold_ms={} flush_threshold_count={} "
"flush_threshold_bytes={} with missing_sequence_ranges={}",
shard,
kafka->consume_ctx.offset,
kafka->consume_ctx.consume_callback_timeout_ms,
Expand Down Expand Up @@ -885,9 +883,9 @@ void StreamShard::initKafkaLog()
/// To ensure the replica can consume records inserted after its down time.
kafka->consume_ctx.enforce_offset = true;
kafka->consume_ctx.auto_offset_reset = ssettings->logstore_auto_offset_reset.value;
kafka->consume_ctx.consume_callback_timeout_ms = static_cast<int32_t>(ssettings->distributed_flush_threshold_ms.value);
kafka->consume_ctx.consume_callback_max_rows = static_cast<int32_t>(ssettings->distributed_flush_threshold_count.value);
kafka->consume_ctx.consume_callback_max_bytes = static_cast<int32_t>(ssettings->distributed_flush_threshold_bytes.value);
kafka->consume_ctx.consume_callback_timeout_ms = static_cast<int32_t>(ssettings->flush_threshold_ms.value);
kafka->consume_ctx.consume_callback_max_rows = static_cast<int32_t>(ssettings->flush_threshold_count.value);
kafka->consume_ctx.consume_callback_max_bytes = static_cast<int32_t>(ssettings->flush_threshold_bytes.value);
kafka->log->initConsumerTopicHandle(kafka->consume_ctx);
}

Expand Down
28 changes: 14 additions & 14 deletions tests/stream/test_stream_smoke/0023_mv.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
{"client":"python", "query_type": "table", "query":"drop view if exists mv_with_target_stream"},
{"client":"python", "query_type": "table", "query":"drop stream if exists source_mv_stream"},
{"client":"python", "query_type": "table", "query":"drop stream if exists target_mv_stream"},
{"client":"python", "query_type": "table", "exist":"source_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_stream (i int) settings distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"target_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists target_mv_stream (ii int) settings distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"source_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_stream (i int) settings flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"target_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists target_mv_stream (ii int) settings flush_threshold_count=1"},
{"client":"python", "query_type": "table", "depends_on_stream":"source_mv_stream", "exist":"mv_with_target_stream", "exist_wait":2, "wait":1, "query":"create materialized view mv_with_target_stream into target_mv_stream as select _tp_time, i + 1 as ii from source_mv_stream"},
{"client":"python", "query_type": "table", "wait":1, "query": "insert into source_mv_stream(i) values (1), (2), (3)"},
{"client":"python", "query_type": "table", "query_id":"2401", "wait":1, "query":"select ii from table(mv_with_target_stream)"},
Expand Down Expand Up @@ -48,8 +48,8 @@
{"client":"python", "query_type": "table", "query":"drop view if exists mv_with_target_stream_2"},
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists source_mv_stream"},
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists target_mv_stream"},
{"client":"python", "query_type": "table", "exist":"source_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_stream (i int) settings distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"target_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists target_mv_stream (ii int) settings distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"source_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_stream (i int) settings flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"target_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists target_mv_stream (ii int) settings flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"mv_with_target_stream_1", "exist_wait":2, "wait":1, "query":"create materialized view mv_with_target_stream_1 into target_mv_stream as select _tp_time, i + 1 as ii from source_mv_stream"},
{"client":"python", "query_type": "table", "exist":"mv_with_target_stream_2", "exist_wait":2, "wait":1, "query":"create materialized view mv_with_target_stream_2 into target_mv_stream as select _tp_time, i + 10 as ii from source_mv_stream"},
{"client":"python", "query_type": "table", "wait":1, "query": "insert into source_mv_stream(i) values (1), (2), (3)"},
Expand Down Expand Up @@ -82,8 +82,8 @@
{"client":"python", "query_type": "table", "query":"drop view if exists mv_with_target_stream"},
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists source_mv_stream"},
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists target_mv_stream"},
{"client":"python", "query_type": "table", "exist":"source_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_stream (i int) settings distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"target_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists target_mv_stream (ii int, s string) settings distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"source_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_stream (i int) settings flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"target_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists target_mv_stream (ii int, s string) settings flush_threshold_count=1"},
{"client":"python", "query_type": "table", "depends_on_stream":"source_mv_stream", "exist":"mv_with_target_stream", "exist_wait":2, "wait":1, "query":"create materialized view mv_with_target_stream into target_mv_stream as select _tp_time, i + 1 as ii from source_mv_stream"},
{"client":"python", "query_type": "table", "wait":1, "query": "insert into source_mv_stream(i) values (1), (2), (3)"},
{"client":"python", "query_type": "table", "query_id":"2409", "wait":1, "query":"select ii, s from table(target_mv_stream)"}
Expand Down Expand Up @@ -111,8 +111,8 @@
{"client":"python", "query_type": "table", "query":"drop view if exists mv_with_target_stream"},
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists source_mv_stream"},
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists target_mv_stream"},
{"client":"python", "query_type": "table", "exist":"source_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_stream (i int) settings distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"target_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists target_mv_stream (ii int, s string) settings distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"source_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_stream (i int) settings flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"target_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists target_mv_stream (ii int, s string) settings flush_threshold_count=1"},
{"client":"python", "query_type": "table", "depends_on_stream":"source_mv_stream", "exist":"mv_with_inner_stream", "exist_wait":2, "wait":1, "query":"create materialized view mv_with_inner_stream as select _tp_time, i + 1 as ii from source_mv_stream"},
{"client":"python", "query_type": "table", "depends_on_stream":"target_mv_stream", "exist":"mv_with_target_stream", "exist_wait":2, "wait":1, "query":"create materialized view mv_with_target_stream into target_mv_stream as select _tp_time, i + 1 as ii from source_mv_stream"},
{"client":"python", "query_type": "table", "query_id":"2400", "wait":2, "query":"alter stream mv_with_inner_stream modify setting logstore_retention_bytes=1000000"},
Expand Down Expand Up @@ -152,9 +152,9 @@
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists source_mv_stream"},
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists source_mv_versioned_kv"},
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists target_mv_stream"},
{"client":"python", "query_type": "table", "exist":"source_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_stream(i int, v string, t datetime64(3, 'UTC')) settings distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"source_mv_versioned_kv", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_versioned_kv(id int, value string) primary key id settings mode='versioned_kv', distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"target_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists target_mv_stream(customerId int, timestamp string, recordValue string, value string) settings distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"source_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_stream(i int, v string, t datetime64(3, 'UTC')) settings flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"source_mv_versioned_kv", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_versioned_kv(id int, value string) primary key id settings mode='versioned_kv', flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"target_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists target_mv_stream(customerId int, timestamp string, recordValue string, value string) settings flush_threshold_count=1"},
{"client":"python", "query_type": "table", "depends_on_stream":"source_mv_versioned_kv", "exist":"mv_with_inner_stream", "exist_wait":2, "wait":1, "query":"create materialized view mv_with_inner_stream as with results as (select i, format_datetime(window_end, '%Y-%m-%d %H:%i:%S', 'UTC') AS timestamp, to_string(max(v)) AS recordValue from tumble(source_mv_stream, t, 2s) group by i, window_end) select source_mv_versioned_kv.id as customerId, results.timestamp as timestamp, results.recordValue as recordValue, source_mv_versioned_kv.value as value from results inner join source_mv_versioned_kv on i = id"},
{"client":"python", "query_type": "table", "depends_on_stream":"target_mv_stream", "exist":"mv_with_target_stream", "exist_wait":2, "wait":1, "query":"create materialized view mv_with_target_stream into target_mv_stream as with results as (select i, format_datetime(window_end, '%Y-%m-%d %H:%i:%S', 'UTC') AS timestamp, to_string(max(v)) AS recordValue from tumble(source_mv_stream, t, 2s) group by i, window_end) select source_mv_versioned_kv.id as customerId, results.timestamp as timestamp, results.recordValue as recordValue, source_mv_versioned_kv.value as value from results inner join source_mv_versioned_kv on i = id"},
{"client":"python", "query_type": "stream", "query_id":"2400", "terminate": "manual", "wait":1, "query":"select (* except _tp_time) from mv_with_inner_stream union select (* except _tp_time) from mv_with_target_stream"},
Expand Down Expand Up @@ -196,9 +196,9 @@
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists source_mv_stream"},
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists source_mv_versioned_kv"},
{"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists target_mv_stream"},
{"client":"python", "query_type": "table", "exist":"source_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_stream(i int, v string, t datetime64(3, 'UTC')) settings distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"source_mv_versioned_kv", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_versioned_kv(id int, value string) primary key id settings mode='versioned_kv', distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"target_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists target_mv_stream(customerId int, timestamp string, recordValue string, value string) settings distributed_flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"source_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_stream(i int, v string, t datetime64(3, 'UTC')) settings flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"source_mv_versioned_kv", "exist_wait":2, "wait":1, "query":"create stream if not exists source_mv_versioned_kv(id int, value string) primary key id settings mode='versioned_kv', flush_threshold_count=1"},
{"client":"python", "query_type": "table", "exist":"target_mv_stream", "exist_wait":2, "wait":1, "query":"create stream if not exists target_mv_stream(customerId int, timestamp string, recordValue string, value string) settings flush_threshold_count=1"},
{"client":"python", "query_type": "table", "depends_on_stream":"source_mv_versioned_kv", "exist":"mv_with_inner_stream", "exist_wait":2, "wait":1, "query":"create materialized view mv_with_inner_stream as with results as (select i, format_datetime(window_end, '%Y-%m-%d %H:%i:%S', 'UTC') AS timestamp, to_string(max(v)) AS recordValue from tumble(source_mv_stream, t, 2s) group by i, window_end) select source_mv_versioned_kv.id as customerId, results.timestamp as timestamp, results.recordValue as recordValue, source_mv_versioned_kv.value as value from results inner join source_mv_versioned_kv on i = id settings max_threads=1, max_insert_threads=1"},
{"client":"python", "query_type": "table", "depends_on_stream":"target_mv_stream", "exist":"mv_with_target_stream", "exist_wait":2, "wait":1, "query":"create materialized view mv_with_target_stream into target_mv_stream as with results as (select i, format_datetime(window_end, '%Y-%m-%d %H:%i:%S', 'UTC') AS timestamp, to_string(max(v)) AS recordValue from tumble(source_mv_stream, t, 2s) group by i, window_end) select source_mv_versioned_kv.id as customerId, results.timestamp as timestamp, results.recordValue as recordValue, source_mv_versioned_kv.value as value from results inner join source_mv_versioned_kv on i = id settings max_threads=1, max_insert_threads=1"},
{"client":"python", "query_type": "stream", "query_id":"2400", "terminate": "manual", "wait":1, "query":"select (* except _tp_time) from mv_with_inner_stream union select (* except _tp_time) from mv_with_target_stream"},
Expand Down

0 comments on commit e795216

Please sign in to comment.