Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename sqd_node to sqd_hotblocks #14

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ members = [
"crates/data-core",
"crates/data-source",
"crates/dataset",
"crates/node",
"crates/node-example",
"crates/hotblocks",
"crates/hotblocks-example",
"crates/polars",
"crates/primitives",
"crates/query",
Expand Down
14 changes: 7 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ ADD Cargo.lock .
ADD crates crates


FROM builder AS node-builder
RUN cargo build -p sqd-node-example --release
FROM builder AS hotblocks-builder
RUN cargo build -p sqd-hotblocks-example --release


FROM rust AS node-example
FROM rust AS hotblocks-example
WORKDIR /app
COPY --from=node-builder /app/target/release/sqd-node-example .
RUN mkdir -p /etc/sqd/node && echo "{}" > /etc/sqd/node/datasets.json
VOLUME /var/sqd/node
ENTRYPOINT ["/app/sqd-node-example", "--db", "/var/sqd/node/db", "--datasets", "/etc/sqd/node/datasets.json"]
COPY --from=hotblocks-builder /app/target/release/sqd-hotblocks-example .
RUN mkdir -p /etc/sqd/hotblocks && echo "{}" > /etc/sqd/hotblocks/datasets.json
VOLUME /var/sqd/hotblocks
ENTRYPOINT ["/app/sqd-hotblocks-example", "--db", "/var/sqd/hotblocks/db", "--datasets", "/etc/sqd/hotblocks/datasets.json"]
EXPOSE 3000
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "sqd-node-example"
name = "sqd-hotblocks-example"
version = "0.1.0"
edition = "2021"

Expand All @@ -11,7 +11,7 @@ clap = { workspace = true, features = ["derive"] }
futures = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sqd-node = { path = "../node" }
sqd-hotblocks = { path = "../hotblocks" }
sqd-polars = { path = "../polars" }
sqd-primitives = { path = "../primitives", features = ["serde"] }
sqd-storage = { path = "../storage" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,27 @@ use axum::routing::{get, post};
use axum::{BoxError, Extension, Json, Router};
use futures::TryStream;
use serde::Serialize;
use sqd_node::{Node, Query, QueryResponse};
use sqd_hotblocks::{HotblocksServer, Query, QueryResponse};
use sqd_primitives::BlockRef;
use sqd_storage::db::DatasetId;
use std::sync::Arc;


type NodeRef = Arc<Node>;
type HotblocksRef = Arc<HotblocksServer>;


pub fn build_app(node: NodeRef) -> Router {
pub fn build_app(hotblocks: HotblocksRef) -> Router {
Router::new()
.route("/", get(|| async { "Welcome to SQD hot block data service!" }))
.route("/datasets/{id}/stream", post(stream))
.route("/datasets/{id}/finalized-head", get(get_finalized_head))
.route("/datasets/{id}/head", get(get_head))
.layer(Extension(node))
.layer(Extension(hotblocks))
}


async fn stream(
Extension(node): Extension<NodeRef>,
Extension(hotblocks): Extension<HotblocksRef>,
Path(dataset_id): Path<DatasetId>,
Json(query): Json<Query>
) -> Response
Expand All @@ -36,7 +36,7 @@ async fn stream(
return (StatusCode::BAD_REQUEST, format!("{}", err)).into_response()
}

match node.query(dataset_id, query).await {
match hotblocks.query(dataset_id, query).await {
Ok(stream) => {
let mut res = Response::builder()
.status(200)
Expand Down Expand Up @@ -69,7 +69,7 @@ fn stream_query_response(mut stream: QueryResponse) -> impl TryStream<Ok=Bytes,


fn error_to_response(err: anyhow::Error) -> Response {
if let Some(above_the_head) = err.downcast_ref::<sqd_node::error::QueryIsAboveTheHead>() {
if let Some(above_the_head) = err.downcast_ref::<sqd_hotblocks::error::QueryIsAboveTheHead>() {
let mut res = Response::builder().status(204);
if let Some(head) = above_the_head.finalized_head.as_ref() {
res = res.header("x-sqd-finalized-head-number", head.number);
Expand All @@ -78,7 +78,7 @@ fn error_to_response(err: anyhow::Error) -> Response {
return res.body(Body::empty()).unwrap()
}

if let Some(fork) = err.downcast_ref::<sqd_node::error::UnexpectedBaseBlock>() {
if let Some(fork) = err.downcast_ref::<sqd_hotblocks::error::UnexpectedBaseBlock>() {
return (
StatusCode::CONFLICT,
Json(BaseBlockConflict {
Expand All @@ -87,13 +87,13 @@ fn error_to_response(err: anyhow::Error) -> Response {
).into_response()
}

let status_code = if err.is::<sqd_node::error::UnknownDataset>() {
let status_code = if err.is::<sqd_hotblocks::error::UnknownDataset>() {
StatusCode::NOT_FOUND
} else if err.is::<sqd_node::error::QueryKindMismatch>() {
} else if err.is::<sqd_hotblocks::error::QueryKindMismatch>() {
StatusCode::BAD_REQUEST
} else if err.is::<sqd_node::error::BlockRangeMissing>() {
} else if err.is::<sqd_hotblocks::error::BlockRangeMissing>() {
StatusCode::BAD_REQUEST
} else if err.is::<sqd_node::error::Busy>() {
} else if err.is::<sqd_hotblocks::error::Busy>() {
StatusCode::SERVICE_UNAVAILABLE
} else {
StatusCode::INTERNAL_SERVER_ERROR
Expand All @@ -117,23 +117,23 @@ struct BaseBlockConflict<'a> {


async fn get_finalized_head(
Extension(node): Extension<NodeRef>,
Extension(hotblocks): Extension<HotblocksRef>,
Path(dataset_id): Path<DatasetId>
) -> Response
{
match node.get_finalized_head(dataset_id) {
match hotblocks.get_finalized_head(dataset_id) {
Ok(head) => (StatusCode::OK, Json(head)).into_response(),
Err(err) => (StatusCode::NOT_FOUND, format!("{}", err)).into_response()
}
}


async fn get_head(
Extension(node): Extension<NodeRef>,
Extension(hotblocks): Extension<HotblocksRef>,
Path(dataset_id): Path<DatasetId>
) -> Response
{
match node.get_head(dataset_id) {
match hotblocks.get_head(dataset_id) {
Ok(head) => (StatusCode::OK, Json(head)).into_response(),
Err(err) => (StatusCode::NOT_FOUND, format!("{}", err)).into_response()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::dataset_config::DatasetConfig;
use anyhow::Context;
use clap::Parser;
use sqd_node::{DBRef, Node, NodeBuilder};
use sqd_hotblocks::{DBRef, HotblocksServer, HotblocksServerBuilder};
use sqd_storage::db::DatabaseSettings;
use std::sync::Arc;

Expand All @@ -27,7 +27,7 @@ pub struct CLI {


impl CLI {
pub fn build_node(&self) -> anyhow::Result<(Arc<Node>, DBRef)> {
pub fn build_server(&self) -> anyhow::Result<(Arc<HotblocksServer>, DBRef)> {
let datasets = DatasetConfig::read_config_file(&self.datasets)
.context("failed to read datasets config")?;

Expand All @@ -37,7 +37,7 @@ impl CLI {
.map(Arc::new)
.context("failed to open rocksdb database")?;

let mut builder = NodeBuilder::new(db.clone());
let mut builder = HotblocksServerBuilder::new(db.clone());

for (id, cfg) in datasets {
let ds = builder.add_dataset(cfg.kind, id, cfg.retention);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use serde::{Deserialize, Serialize};
use sqd_node::{DatasetKind, RetentionStrategy};
use sqd_hotblocks::{DatasetKind, RetentionStrategy};
use sqd_storage::db::DatasetId;
use std::collections::BTreeMap;
use url::Url;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod app;
use crate::app::build_app;
use crate::cli::CLI;
use clap::Parser;
use sqd_node::DBRef;
use sqd_hotblocks::DBRef;
use std::time::Duration;
use tokio::signal;
use tower_http::timeout::TimeoutLayer;
Expand Down Expand Up @@ -40,11 +40,11 @@ fn main() -> anyhow::Result<()> {
.enable_all()
.build()?
.block_on(async {
let (node, db) = args.build_node()?;
let (hotblocks, db) = args.build_server()?;

tokio::spawn(db_cleanup_task(db));

let app = build_app(node).layer(TimeoutLayer::new(Duration::from_secs(10)));
let app = build_app(hotblocks).layer(TimeoutLayer::new(Duration::from_secs(10)));

let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;

Expand Down
2 changes: 1 addition & 1 deletion crates/node/Cargo.toml → crates/hotblocks/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "sqd-node"
name = "sqd-hotblocks"
version = "0.1.0"
edition = "2021"

Expand Down
2 changes: 1 addition & 1 deletion crates/node/src/error.rs → crates/hotblocks/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct Busy;

impl Display for Busy {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "node is busy")
write!(f, "hotblocks server is busy")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
mod node;
mod node_builder;
mod query_executor;
mod query_response;
mod server;
mod server_builder;


pub use node::*;
pub use node_builder::*;
pub use query_response::QueryResponse;
pub use server::*;
pub use server_builder::*;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::Busy;
use crate::node::query_executor::QueryExecutorRef;
use crate::hotblocks::query_executor::QueryExecutorRef;
use crate::query::runner::QueryRunner;
use crate::types::DBRef;
use anyhow::bail;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::error::{QueryIsAboveTheHead, QueryKindMismatch, UnknownDataset};
use crate::ingest::DatasetController;
use crate::node::node_builder::NodeBuilder;
use crate::node::query_executor::{QueryExecutor, QueryExecutorRef};
use crate::node::query_response::QueryResponse;
use crate::hotblocks::server_builder::HotblocksServerBuilder;
use crate::hotblocks::query_executor::{QueryExecutor, QueryExecutorRef};
use crate::hotblocks::query_response::QueryResponse;
use crate::types::{DBRef, DatasetKind, RetentionStrategy};
use anyhow::{bail, ensure};
use futures::stream::FuturesUnordered;
Expand All @@ -17,16 +17,16 @@ use std::time::Duration;
use tracing::{error, info};


pub struct Node {
pub struct HotblocksServer {
executor: QueryExecutorRef,
db: DBRef,
datasets: HashMap<DatasetId, Arc<DatasetController>>,
ingest_handle: tokio::task::JoinHandle<()>
}


impl Node {
pub(super) fn new(builder: NodeBuilder) -> Self {
impl HotblocksServer {
pub(super) fn new(builder: HotblocksServerBuilder) -> Self {
let datasets: HashMap<_, _> = builder.datasets.into_iter().map(|cfg| {
let first_block = match cfg.retention {
RetentionStrategy::FromBlock(first_block) => first_block,
Expand Down Expand Up @@ -136,7 +136,7 @@ impl Node {
}


impl Drop for Node {
impl Drop for HotblocksServer {
fn drop(&mut self) {
self.ingest_handle.abort()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::ingest::DataSource;
use crate::node::node::Node;
use crate::hotblocks::server::HotblocksServer;
use crate::types::{DBRef, DatasetKind, RetentionStrategy};
use reqwest::IntoUrl;
use sqd_storage::db::DatasetId;
Expand Down Expand Up @@ -27,15 +27,15 @@ impl DatasetConfig {
}


pub struct NodeBuilder {
pub struct HotblocksServerBuilder {
pub(super) db: DBRef,
pub(super) datasets: Vec<DatasetConfig>,
pub(super) default_http_client: reqwest::Client,
pub(super) max_pending_query_tasks: usize
}


impl NodeBuilder {
impl HotblocksServerBuilder {
pub fn new(db: DBRef) -> Self {
Self {
db,
Expand Down Expand Up @@ -65,7 +65,7 @@ impl NodeBuilder {
self.datasets.last_mut().unwrap()
}

pub fn build(self) -> Node {
Node::new(self)
pub fn build(self) -> HotblocksServer {
HotblocksServer::new(self)
}
}
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions crates/node/src/lib.rs → crates/hotblocks/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#![allow(unused)]
pub mod error;
mod ingest;
mod node;
mod hotblocks;
mod query;
mod types;


pub use node::*;
pub use hotblocks::*;
pub use types::{DBRef, DatasetKind, RetentionStrategy};
pub use sqd_query::Query;
File renamed without changes.
File renamed without changes.
File renamed without changes.