Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: writer-id to node-id #25905

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion influxdb3/src/commands/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
"\
Token: {token}\n\
Hashed Token: {hashed}\n\n\
Start the server with `influxdb3 serve --bearer-token {hashed} --object-store file --data-dir ~/.influxdb3 --writer-id YOUR_HOST_NAME`\n\n\
Start the server with `influxdb3 serve --bearer-token {hashed} --object-store file --data-dir ~/.influxdb3 --node-id YOUR_HOST_NAME`\n\n\
HTTP requests require the following header: \"Authorization: Bearer {token}\"\n\
This will grant you access to every HTTP endpoint or deny it otherwise
",
Expand Down
12 changes: 6 additions & 6 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,17 @@ pub struct Config {
)]
pub buffer_mem_limit_mb: usize,

/// The writer idendifier used as a prefix in all object store file paths. This should be unique
/// The node idendifier used as a prefix in all object store file paths. This should be unique
/// for any InfluxDB 3 Core servers that share the same object store configuration, i.e., the
/// same bucket.
#[clap(
long = "writer-id",
long = "node-id",
// TODO: deprecate this alias in future version
alias = "host-id",
env = "INFLUXDB3_WRITER_IDENTIFIER_PREFIX",
env = "INFLUXDB3_NODE_IDENTIFIER_PREFIX",
action
)]
pub writer_identifier_prefix: String,
pub node_identifier_prefix: String,

/// The size of the in-memory Parquet cache in megabytes (MB).
#[clap(
Expand Down Expand Up @@ -419,7 +419,7 @@ pub async fn command(config: Config) -> Result<()> {
let num_cpus = num_cpus::get();
let build_malloc_conf = build_malloc_conf();
info!(
writer_id = %config.writer_identifier_prefix,
node_id = %config.node_identifier_prefix,
git_hash = %INFLUXDB3_GIT_HASH as &str,
version = %INFLUXDB3_VERSION.as_ref() as &str,
uuid = %PROCESS_UUID.as_ref() as &str,
Expand Down Expand Up @@ -509,7 +509,7 @@ pub async fn command(config: Config) -> Result<()> {

let persister = Arc::new(Persister::new(
Arc::clone(&object_store),
config.writer_identifier_prefix,
config.node_identifier_prefix,
));
let wal_config = WalConfig {
gen1_duration: config.gen1_duration,
Expand Down
6 changes: 3 additions & 3 deletions influxdb3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ long_about = r#"InfluxDB 3 Core server and command line tools

Examples:
# Run the InfluxDB 3 Core server
influxdb3 serve --object-store file --data-dir ~/.influxdb3 --writer_id my_writer_name
influxdb3 serve --object-store file --data-dir ~/.influxdb3 --node_id my_node_name

# Display all commands short form
influxdb3 -h
Expand All @@ -64,10 +64,10 @@ Examples:
influxdb3 --help

# Run the InfluxDB 3 Core server with extra verbose logging
influxdb3 serve -v --object-store file --data-dir ~/.influxdb3 --writer_id my_writer_name
influxdb3 serve -v --object-store file --data-dir ~/.influxdb3 --node_id my_node_name

# Run InfluxDB 3 Core with full debug logging specified with LOG_FILTER
LOG_FILTER=debug influxdb3 serve --object-store file --data-dir ~/.influxdb3 --writer_id my_writer_name
LOG_FILTER=debug influxdb3 serve --object-store file --data-dir ~/.influxdb3 --node_id my_node_name
"#
)]
struct Config {
Expand Down
16 changes: 8 additions & 8 deletions influxdb3/tests/server/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ fn create_plugin_file(code: &str) -> NamedTempFile {
async fn test_telemetry_disabled_with_debug_msg() {
let serve_args = &[
"serve",
"--writer-id",
"the-best-writer",
"--node-id",
"the-best-node",
"--object-store",
"memory",
];
Expand All @@ -141,8 +141,8 @@ async fn test_telemetry_disabled_with_debug_msg() {
async fn test_telemetry_disabled() {
let serve_args = &[
"serve",
"--writer-id",
"the-best-writer",
"--node-id",
"the-best-node",
"--object-store",
"memory",
];
Expand All @@ -168,8 +168,8 @@ async fn test_telemetry_disabled() {
async fn test_telemetry_enabled_with_debug_msg() {
let serve_args = &[
"serve",
"--writer-id",
"the-best-writer",
"--node-id",
"the-best-node",
"--object-store",
"memory",
];
Expand Down Expand Up @@ -198,8 +198,8 @@ async fn test_telemetry_enabled_with_debug_msg() {
async fn test_telementry_enabled() {
let serve_args = &[
"serve",
"--writer-id",
"the-best-writer",
"--node-id",
"the-best-node",
"--object-store",
"memory",
];
Expand Down
10 changes: 5 additions & 5 deletions influxdb3/tests/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ trait ConfigProvider {
#[derive(Debug, Default)]
pub struct TestConfig {
auth_token: Option<(String, String)>,
writer_id: Option<String>,
node_id: Option<String>,
plugin_dir: Option<String>,
// If None, use memory object store.
object_store_dir: Option<String>,
Expand All @@ -65,8 +65,8 @@ impl TestConfig {
}

/// Set a host identifier prefix on the spawned [`TestServer`]
pub fn with_writer_id<S: Into<String>>(mut self, writer_id: S) -> Self {
self.writer_id = Some(writer_id.into());
pub fn with_node_id<S: Into<String>>(mut self, node_id: S) -> Self {
self.node_id = Some(node_id.into());
self
}

Expand All @@ -92,8 +92,8 @@ impl ConfigProvider for TestConfig {
if let Some(plugin_dir) = &self.plugin_dir {
args.append(&mut vec!["--plugin-dir".to_string(), plugin_dir.to_owned()]);
}
args.push("--writer-id".to_string());
if let Some(host) = &self.writer_id {
args.push("--node-id".to_string());
if let Some(host) = &self.node_id {
args.push(host.to_owned());
} else {
args.push("test-server".to_string());
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_cache/src/last_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,9 +1349,9 @@ mod tests {
.insert(table_def.table_id, Arc::new(table_def));
// Create the catalog and clone its InnerCatalog (which is what the LastCacheProvider is
// initialized from):
let writer_id = Arc::from("sample-host-id");
let node_id = Arc::from("sample-host-id");
let instance_id = Arc::from("sample-instance-id");
let catalog = Catalog::new(writer_id, instance_id);
let catalog = Catalog::new(node_id, instance_id);
let db_id = database.id;
catalog.insert_database(database);
let catalog = Arc::new(catalog);
Expand Down
44 changes: 22 additions & 22 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ impl Catalog {
/// Limit for the number of tables across all DBs that InfluxDB 3 Core OSS can have
pub(crate) const NUM_TABLES_LIMIT: usize = 2000;

pub fn new(writer_id: Arc<str>, instance_id: Arc<str>) -> Self {
pub fn new(node_id: Arc<str>, instance_id: Arc<str>) -> Self {
Self {
inner: RwLock::new(InnerCatalog::new(writer_id, instance_id)),
inner: RwLock::new(InnerCatalog::new(node_id, instance_id)),
}
}

Expand Down Expand Up @@ -304,8 +304,8 @@ impl Catalog {
Arc::clone(&self.inner.read().instance_id)
}

pub fn writer_id(&self) -> Arc<str> {
Arc::clone(&self.inner.read().writer_id)
pub fn node_id(&self) -> Arc<str> {
Arc::clone(&self.inner.read().node_id)
}

#[cfg(test)]
Expand Down Expand Up @@ -372,9 +372,9 @@ pub struct InnerCatalog {
/// The catalog is a map of databases with their table schemas
databases: SerdeVecMap<DbId, Arc<DatabaseSchema>>,
sequence: CatalogSequenceNumber,
/// The `writer_id` is the prefix that is passed in when starting up
/// (`writer_identifier_prefix`)
writer_id: Arc<str>,
/// The `node_id` is the prefix that is passed in when starting up
/// (`node_identifier_prefix`)
node_id: Arc<str>,
/// The instance_id uniquely identifies the instance that generated the catalog
instance_id: Arc<str>,
/// If true, the catalog has been updated since the last time it was serialized
Expand Down Expand Up @@ -438,11 +438,11 @@ serde_with::serde_conv!(
);

impl InnerCatalog {
pub(crate) fn new(writer_id: Arc<str>, instance_id: Arc<str>) -> Self {
pub(crate) fn new(node_id: Arc<str>, instance_id: Arc<str>) -> Self {
Self {
databases: SerdeVecMap::new(),
sequence: CatalogSequenceNumber::new(0),
writer_id,
node_id,
instance_id,
updated: false,
db_map: BiHashMap::new(),
Expand Down Expand Up @@ -1428,10 +1428,10 @@ mod tests {

#[test]
fn catalog_serialization() {
let writer_id = Arc::from("sample-host-id");
let node_id = Arc::from("sample-host-id");
let instance_id = Arc::from("instance-id");
let cloned_instance_id = Arc::clone(&instance_id);
let catalog = Catalog::new(writer_id, cloned_instance_id);
let catalog = Catalog::new(node_id, cloned_instance_id);
let mut database = DatabaseSchema {
id: DbId::from(0),
name: "test_db".into(),
Expand Down Expand Up @@ -1541,7 +1541,7 @@ mod tests {
]
],
"sequence": 0,
"writer_id": "test",
"node_id": "test",
"instance_id": "test",
"db_map": []
}"#;
Expand Down Expand Up @@ -1587,7 +1587,7 @@ mod tests {
]
],
"sequence": 0,
"writer_id": "test",
"node_id": "test",
"instance_id": "test",
"db_map": []
}"#;
Expand Down Expand Up @@ -1700,9 +1700,9 @@ mod tests {

#[test]
fn serialize_series_keys() {
let writer_id = Arc::from("sample-host-id");
let node_id = Arc::from("sample-host-id");
let instance_id = Arc::from("instance-id");
let catalog = Catalog::new(writer_id, instance_id);
let catalog = Catalog::new(node_id, instance_id);
let mut database = DatabaseSchema {
id: DbId::from(0),
name: "test_db".into(),
Expand Down Expand Up @@ -1757,9 +1757,9 @@ mod tests {

#[test]
fn serialize_last_cache() {
let writer_id = Arc::from("sample-host-id");
let node_id = Arc::from("sample-host-id");
let instance_id = Arc::from("instance-id");
let catalog = Catalog::new(writer_id, instance_id);
let catalog = Catalog::new(node_id, instance_id);
let mut database = DatabaseSchema {
id: DbId::from(0),
name: "test_db".into(),
Expand Down Expand Up @@ -1823,14 +1823,14 @@ mod tests {
}

#[test]
fn catalog_instance_and_writer_ids() {
let writer_id = Arc::from("sample-host-id");
fn catalog_instance_and_node_ids() {
let node_id = Arc::from("sample-host-id");
let instance_id = Arc::from("sample-instance-id");
let cloned_writer_id = Arc::clone(&writer_id);
let cloned_node_id = Arc::clone(&node_id);
let cloned_instance_id = Arc::clone(&instance_id);
let catalog = Catalog::new(cloned_writer_id, cloned_instance_id);
let catalog = Catalog::new(cloned_node_id, cloned_instance_id);
assert_eq!(instance_id, catalog.instance_id());
assert_eq!(writer_id, catalog.writer_id());
assert_eq!(node_id, catalog.node_id());
}

/// See: https://github.com/influxdata/influxdb/issues/25524
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ expression: catalog
]
],
"sequence": 0,
"writer_id": "sample-host-id",
"node_id": "sample-host-id",
"instance_id": "instance-id",
"db_map": []
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ expression: catalog
]
],
"sequence": 0,
"writer_id": "sample-host-id",
"node_id": "sample-host-id",
"instance_id": "instance-id",
"db_map": []
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ expression: catalog
]
],
"sequence": 0,
"writer_id": "sample-host-id",
"node_id": "sample-host-id",
"instance_id": "instance-id",
"db_map": []
}
4 changes: 2 additions & 2 deletions influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,9 +760,9 @@ mod tests {
DedicatedExecutor::new_testing(),
));
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let sample_writer_id = Arc::from("sample-host-id");
let sample_node_id = Arc::from("sample-host-id");
let instance_id = Arc::from("sample-instance-id");
let catalog = Arc::new(Catalog::new(sample_writer_id, instance_id));
let catalog = Arc::new(Catalog::new(sample_node_id, instance_id));
let write_buffer_impl = influxdb3_write::write_buffer::WriteBufferImpl::new(
influxdb3_write::write_buffer::WriteBufferImplArgs {
persister: Arc::clone(&persister),
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_server/src/query_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,9 +821,9 @@ mod tests {
);
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let exec = make_exec(Arc::clone(&object_store));
let writer_id = Arc::from("sample-host-id");
let node_id = Arc::from("sample-host-id");
let instance_id = Arc::from("instance-id");
let catalog = Arc::new(Catalog::new(writer_id, instance_id));
let catalog = Arc::new(Catalog::new(node_id, instance_id));
let write_buffer_impl = WriteBufferImpl::new(WriteBufferImplArgs {
persister,
catalog: Arc::clone(&catalog),
Expand Down
Loading
Loading