Skip to content

Commit

Permalink
feat!: reorganize the storage layout (#1609)
Browse files Browse the repository at this point in the history
* feat: adds data_home to DataOptions

* refactor: split out object store stuffs from datanode instance

* feat: move data_home into FileConfig

* refactor: object storage layers

* feat: adds datanode path to procedure paths

* feat: temp commit

* refactor: clean code

* fix: forgot files

* fix: forgot files

* Update src/common/test-util/src/ports.rs

Co-authored-by: Yingwen <[email protected]>

* Update tests/runner/src/env.rs

Co-authored-by: Yingwen <[email protected]>

* fix: compile error

* chore: cr comments

* fix: dependencies order in cargo

* fix: data path in test

---------

Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
killme2008 and evenyag authored May 23, 2023
1 parent 5b304fa commit 7c55783
Show file tree
Hide file tree
Showing 40 changed files with 614 additions and 340 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ tcp_nodelay = true

# WAL options, see `standalone.example.toml`.
[wal]
dir = "/tmp/greptimedb/wal"
# WAL data directory
# dir = "/tmp/greptimedb/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
Expand All @@ -34,7 +35,7 @@ sync_write = false
# Storage options, see `standalone.example.toml`.
[storage]
type = "File"
data_dir = "/tmp/greptimedb/data/"
data_home = "/tmp/greptimedb/"

# Compaction options, see `standalone.example.toml`.
[storage.compaction]
Expand Down
6 changes: 3 additions & 3 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ addr = "127.0.0.1:4004"

# WAL options.
[wal]
# WAL data directory.
dir = "/tmp/greptimedb/wal"
# WAL data directory
# dir = "/tmp/greptimedb/wal"
# WAL file size in bytes.
file_size = "1GB"
# WAL purge threshold in bytes.
Expand All @@ -96,7 +96,7 @@ sync_write = false
# Storage type.
type = "File"
# Data directory, "/tmp/greptimedb/data" by default.
data_dir = "/tmp/greptimedb/data/"
data_home = "/tmp/greptimedb/"

# Compaction options.
[storage.compaction]
Expand Down
22 changes: 11 additions & 11 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct StartCommand {
#[clap(short, long)]
config_file: Option<String>,
#[clap(long)]
data_dir: Option<String>,
data_home: Option<String>,
#[clap(long)]
wal_dir: Option<String>,
#[clap(long)]
Expand Down Expand Up @@ -147,14 +147,14 @@ impl StartCommand {
.fail();
}

if let Some(data_dir) = &self.data_dir {
if let Some(data_home) = &self.data_home {
opts.storage.store = ObjectStoreConfig::File(FileConfig {
data_dir: data_dir.clone(),
data_home: data_home.clone(),
});
}

if let Some(wal_dir) = &self.wal_dir {
opts.wal.dir = wal_dir.clone();
opts.wal.dir = Some(wal_dir.clone());
}

if let Some(http_addr) = &self.http_addr {
Expand Down Expand Up @@ -214,7 +214,7 @@ mod tests {
tcp_nodelay = true
[wal]
dir = "/tmp/greptimedb/wal"
dir = "/other/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
Expand All @@ -223,7 +223,7 @@ mod tests {
[storage]
type = "File"
data_dir = "/tmp/greptimedb/data/"
data_home = "/tmp/greptimedb/"
[storage.compaction]
max_inflight_tasks = 3
Expand Down Expand Up @@ -255,6 +255,7 @@ mod tests {
assert_eq!(2, options.mysql_runtime_size);
assert_eq!(Some(42), options.node_id);

assert_eq!("/other/wal", options.wal.dir.unwrap());
assert_eq!(Duration::from_secs(600), options.wal.purge_interval);
assert_eq!(1024 * 1024 * 1024, options.wal.file_size.0);
assert_eq!(1024 * 1024 * 1024 * 50, options.wal.purge_threshold.0);
Expand All @@ -273,8 +274,8 @@ mod tests {
assert!(tcp_nodelay);

match &options.storage.store {
ObjectStoreConfig::File(FileConfig { data_dir, .. }) => {
assert_eq!("/tmp/greptimedb/data/", data_dir)
ObjectStoreConfig::File(FileConfig { data_home, .. }) => {
assert_eq!("/tmp/greptimedb/", data_home)
}
ObjectStoreConfig::S3 { .. } => unreachable!(),
ObjectStoreConfig::Oss { .. } => unreachable!(),
Expand Down Expand Up @@ -374,7 +375,6 @@ mod tests {
tcp_nodelay = true
[wal]
dir = "/tmp/greptimedb/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
Expand All @@ -383,7 +383,7 @@ mod tests {
[storage]
type = "File"
data_dir = "/tmp/greptimedb/data/"
data_home = "/tmp/greptimedb/"
[storage.compaction]
max_inflight_tasks = 3
Expand Down Expand Up @@ -464,7 +464,7 @@ mod tests {
assert_eq!(opts.storage.compaction.max_purge_tasks, 32);

// Should be read from cli, cli > config file > env > default values.
assert_eq!(opts.wal.dir, "/other/wal/dir");
assert_eq!(opts.wal.dir.unwrap(), "/other/wal/dir");

// Should be default value.
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ mod tests {
);

// Should be the values from config file, not environment variables.
assert_eq!(opts.wal.dir, "/tmp/greptimedb/wal".to_string());
assert_eq!(opts.wal.dir.unwrap(), "/tmp/greptimedb/wal");

// Should be default values.
assert_eq!(opts.node_id, None);
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ mod tests {
);
assert!(fe_opts.influxdb_options.as_ref().unwrap().enable);

assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir);
assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap());
match &dn_opts.storage.store {
datanode::datanode::ObjectStoreConfig::S3(s3_config) => {
assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ mod tests {
#[ignore]
#[test]
fn test_repl() {
let data_dir = create_temp_dir("data");
let data_home = create_temp_dir("data");
let wal_dir = create_temp_dir("wal");

let mut bin_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
Expand All @@ -65,7 +65,7 @@ mod tests {
"start",
"--rpc-addr=0.0.0.0:4321",
"--node-id=1",
&format!("--data-dir={}", data_dir.path().display()),
&format!("--data-home={}", data_home.path().display()),
&format!("--wal-dir={}", wal_dir.path().display()),
])
.stdout(Stdio::null())
Expand Down
1 change: 1 addition & 0 deletions src/common/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod bit_vec;
pub mod buffer;
pub mod bytes;
pub mod paths;
#[allow(clippy::all)]
pub mod readable_size;

Expand Down
25 changes: 25 additions & 0 deletions src/common/base/src/paths.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Path constants for table engines, cluster states and WAL
/// All paths relative to data_home(file storage) or root path(S3, OSS etc).
/// WAL dir for local file storage
pub const WAL_DIR: &str = "wal/";

/// Data dir for table engines
pub const DATA_DIR: &str = "data/";

/// Cluster state dir
pub const CLUSTER_DIR: &str = "cluster/";
22 changes: 14 additions & 8 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ impl ManagerContext {
/// Config for [LocalManager].
#[derive(Debug)]
pub struct ManagerConfig {
pub parent_path: String,
pub max_retry_times: usize,
pub retry_delay: Duration,
pub remove_outdated_meta_task_interval: Duration,
Expand All @@ -352,6 +353,7 @@ pub struct ManagerConfig {
impl Default for ManagerConfig {
fn default() -> Self {
Self {
parent_path: "".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
Expand All @@ -363,7 +365,7 @@ impl Default for ManagerConfig {
/// A [ProcedureManager] that maintains procedure states locally.
pub struct LocalManager {
manager_ctx: Arc<ManagerContext>,
state_store: StateStoreRef,
procedure_store: Arc<ProcedureStore>,
max_retry_times: usize,
retry_delay: Duration,
remove_outdated_meta_task: RepeatedTask<Error>,
Expand All @@ -382,7 +384,7 @@ impl LocalManager {
);
LocalManager {
manager_ctx,
state_store,
procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)),
max_retry_times: config.max_retry_times,
retry_delay: config.retry_delay,
remove_outdated_meta_task,
Expand All @@ -405,7 +407,7 @@ impl LocalManager {
exponential_builder: ExponentialBuilder::default()
.with_min_delay(self.retry_delay)
.with_max_times(self.max_retry_times),
store: ProcedureStore::new(self.state_store.clone()),
store: self.procedure_store.clone(),
rolling_back: false,
};

Expand Down Expand Up @@ -466,8 +468,7 @@ impl ProcedureManager for LocalManager {
logging::info!("LocalManager start to recover");
let recover_start = Instant::now();

let procedure_store = ProcedureStore::new(self.state_store.clone());
let (messages, finished_ids) = procedure_store.load_messages().await?;
let (messages, finished_ids) = self.procedure_store.load_messages().await?;

for (procedure_id, message) in &messages {
if message.parent_id.is_none() {
Expand Down Expand Up @@ -502,7 +503,7 @@ impl ProcedureManager for LocalManager {
);

for procedure_id in finished_ids {
if let Err(e) = procedure_store.delete_procedure(procedure_id).await {
if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
logging::error!(e; "Failed to delete procedure {}", procedure_id);
}
}
Expand Down Expand Up @@ -571,7 +572,7 @@ mod tests {

use super::*;
use crate::error::Error;
use crate::store::ObjectStateStore;
use crate::store::state_store::ObjectStateStore;
use crate::{Context, Procedure, Status};

#[test]
Expand Down Expand Up @@ -680,6 +681,7 @@ mod tests {
fn test_register_loader() {
let dir = create_temp_dir("register");
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
Expand All @@ -702,6 +704,7 @@ mod tests {
let dir = create_temp_dir("recover");
let object_store = test_util::new_object_store(&dir);
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
Expand All @@ -714,7 +717,7 @@ mod tests {
.unwrap();

// Prepare data
let procedure_store = ProcedureStore::from(object_store.clone());
let procedure_store = ProcedureStore::from_object_store(object_store.clone());
let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager"));
let root_id = ProcedureId::random();
// Prepare data for the root procedure.
Expand Down Expand Up @@ -749,6 +752,7 @@ mod tests {
async fn test_submit_procedure() {
let dir = create_temp_dir("submit");
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
Expand Down Expand Up @@ -798,6 +802,7 @@ mod tests {
async fn test_state_changed_on_err() {
let dir = create_temp_dir("on_err");
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
Expand Down Expand Up @@ -860,6 +865,7 @@ mod tests {
let dir = create_temp_dir("remove_outdated_meta_task");
let object_store = test_util::new_object_store(&dir);
let config = ManagerConfig {
parent_path: "data/".to_string(),
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_millis(1),
Expand Down
Loading

0 comments on commit 7c55783

Please sign in to comment.