From 6c30c3988f6bdc993ed61828b23bd977da7321e5 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 14 Jan 2025 11:34:19 +0800 Subject: [PATCH] feat: init vortex (#6) Signed-off-by: Gaius --- Cargo.lock | 161 +++++++++++++++++++++++++++ Cargo.toml | 3 + docs/README.md | 32 +++--- src/error/mod.rs | 46 ++++++++ src/lib.rs | 229 +++++++++++++++++++++++++++++++++++++- src/tlv/download_piece.rs | 162 +++++++++++++++++++++++++++ src/tlv/error.rs | 212 +++++++++++++++++++++++++++++++++++ src/tlv/mod.rs | 88 +++++++++++++++ src/tlv/piece_content.rs | 97 ++++++++++++++++ 9 files changed, 1009 insertions(+), 21 deletions(-) create mode 100644 src/error/mod.rs create mode 100644 src/tlv/download_piece.rs create mode 100644 src/tlv/error.rs create mode 100644 src/tlv/mod.rs create mode 100644 src/tlv/piece_content.rs diff --git a/Cargo.lock b/Cargo.lock index 4a348d6..5caad7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,167 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "bytes" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "libc" +version = "0.2.169" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" + +[[package]] +name = "ppv-lite86" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "proc-macro2" +version = "1.0.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "syn" +version = "2.0.96" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5d0adab1ae378d7f53bdebc67a39f1f151407ef230f0ce2883572f5d8985c80" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" + [[package]] name = "vortex" version = "0.1.0" +dependencies = [ + "bytes", + "rand", + "thiserror", +] + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index 0255b94..a98fb50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,6 @@ readme = "README.md" edition = "2021" [dependencies] +thiserror = "1.0" +rand = "0.8" +bytes = "1" diff --git a/docs/README.md b/docs/README.md index 3a0a42c..cb58c20 100644 --- a/docs/README.md +++ b/docs/README.md @@ -7,44 +7,44 @@ scalable file sharing capabilities. ## Protocol Fields - **Packet Identifier (8 bits):** Uniquely identifies each packet. -- **Tag (T, 7 bits):** Specifies the type of data in the value field. -- **Length (L, 30 bits):** Indicates the length (in bytes) of the Value field, supporting up to 1GiB of data. +- **Tag (T, 8 bits):** Specifies the type of data in the value field. +- **Length (L, 32 bits):** Indicates the length (in bytes) of the Value field, supporting up to 4GiB of data. - **Value (V, variable length):** The actual data, up to 1GiB. ## Tag Definitions -| Tag | Name | Description | -| ----- | ---------------- | -------------------------------------------------------------------------------------------------------------- | -| 0 | Piece Identifier | Composed of `{Task ID}-{Piece ID}`, where the Task ID is a 32-byte SHA-256 value and the Piece ID is a number. | -| 1 | Piece Content | The content of a piece, with a maximum size of 1 GiB per piece. | -| 2-126 | Reserved | Reserved for future use. | -| 127 | Error | Error message. | +| Tag | Name | Description | +| ----- | -------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 0 | Download Piece | Download the content of a piece from a peer. It is composed of `{Task ID}-{Piece ID}`, where the Task ID is a 32-byte SHA-256 value and the Piece ID is a number. | +| 1 | Piece Content | The content of a piece, with a maximum size of 1 GiB per piece. | +| 2-254 | Reserved | Reserved for future use. | +| 255 | Error | Error message. | ## Packet Format -| Packet ID (8 bits) | Tag (7 bits) | Length (30 bits) | Value (up to 1GiB) | +| Packet ID (8 bits) | Tag (8 bits) | Length (32 bits) | Value (up to 4GiB) | | ------------------ | ------------ | ---------------- | ------------------ | -| 8-bit | 7-bit | 30-bit | variable | +| 8-bit | 8-bit | 32-bit | variable | - **Packet ID:** 8-bit unsigned integer. -- **Tag:** 7-bit field describing the content type. -- **Length:** 30-bit field specifying the size of the Value. +- **Tag:** 8-bit field describing the content type. +- **Length:** 32-bit field specifying the size of the Value. - **Value:** Actual data, size determined by Length. ## Behavior -- **Piece ID (Tag=0x00):** Identifies the specific piece of the file. +- **Download Piece (Tag=0x00):** Download the content of a piece from a peer. - **Piece Content (Tag=0x01):** Raw piece data or piece fragments. -- **Error (Tag=0x7F):** Conveys error. -- **Reserved Tags:** May be allocated for metadata, compression, encryption, or future protocol extensions. +- **Error (Tag=0xFF):** Conveys error. +- **Reserved Tags:** Tags 2-254 may be allocated for metadata, compression, encryption, or future protocol extensions. ## Example - **Packet ID:** 0x12 -- **Tag:** 0x00 (piece content) +- **Tag:** 0x00 (Download Piece) - **Length:** 10 (indicating "HelloWorld" is 10 bytes) - **Value:** "HelloWorld" diff --git a/src/error/mod.rs b/src/error/mod.rs new file mode 100644 index 0000000..a2c77cf --- /dev/null +++ b/src/error/mod.rs @@ -0,0 +1,46 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// Error is the error type for the Vortex protocol. +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// InvalidPacket indicates an invalid Vortex packet. + #[error("invalid vortex packet, cause: {0}")] + InvalidPacket(String), + + /// InvalidLength indicates an invalid length. + #[error("invalid length, cause: {0}")] + InvalidLength(String), + + /// TryFromSliceError indicates a conversion error. + #[error(transparent)] + TryFromSliceError(#[from] std::array::TryFromSliceError), + + /// Utf8Error indicates a conversion error. + #[error(transparent)] + Utf8Error(#[from] std::str::Utf8Error), + + /// ParseIntError indicates a conversion error. + #[error(transparent)] + ParseIntError(#[from] std::num::ParseIntError), + + /// FromUtf8Error indicates a conversion error. + #[error(transparent)] + FromUtf8Error(#[from] std::string::FromUtf8Error), +} + +/// Result is the result type for the Vortex protocol. +pub type Result = std::result::Result; diff --git a/src/lib.rs b/src/lib.rs index 2d43b0c..f92ca74 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,25 @@ * limitations under the License. */ +use crate::error::{Error, Result}; +use bytes::{BufMut, Bytes, BytesMut}; +use rand::prelude::*; + +pub mod error; +pub mod tlv; + +/// HEADER_SIZE is the size of the Vortex packet header including the packet identifier, tag, and +/// length. +const HEADER_SIZE: usize = 6; + +/// Header represents the Vortex packet header. +#[derive(Debug)] +pub struct Header { + packet_id: u8, + tag: tlv::Tag, + length: usize, +} + /// Vortex Protocol /// /// Vortex is a peer-to-peer (P2P) file transfer protocol using TLV (Tag-Length-Value) format for @@ -21,8 +40,8 @@ /// /// Packet Format: /// - Packet Identifier (8 bits): Uniquely identifies each packet -/// - Tag (7 bits): Specifies data type in value field -/// - Length (30 bits): Indicates Value field length, up to 1 GiB +/// - Tag (8 bits): Specifies data type in value field +/// - Length (32 bits): Indicates Value field length, up to 4 GiB /// - Value (variable): Actual data content, maximum 1 GiB /// /// Protocol Format: @@ -30,11 +49,211 @@ /// ```text /// ------------------------------------------------------------------------------------------------- /// | | | | | -/// | Packet Identifier (8 bits) | Tag (7 bits) | Length (30 bits) | Value (up to 1 GiB) | +/// | Packet Identifier (8 bits) | Tag (8 bits) | Length (32 bits) | Value (up to 4 GiB) | /// | | | | | /// ------------------------------------------------------------------------------------------------- /// ``` /// /// For more information, please refer to the [Vortex Protocol](https://github.com/dragonflyoss/vortex/blob/main/docs/README.md). -#[derive(Debug, Clone)] -pub struct Vortex {} +#[derive(Debug)] +pub enum Vortex { + DownloadPiece(Header, tlv::download_piece::DownloadPiece), + PieceContent(Header, tlv::piece_content::PieceContent), + Reserved(Header), + Error(Header, tlv::error::Error), +} + +/// Vortex implements the Vortex functions. +impl Vortex { + /// Creates a new Vortex packet. + pub fn new(tag: tlv::Tag, value: Bytes) -> Result { + let mut rng = rand::thread_rng(); + let header = Header { + packet_id: rng.gen(), + tag, + length: value.len(), + }; + + match tag { + tlv::Tag::DownloadPiece => { + let download_piece = tlv::download_piece::DownloadPiece::from_bytes(value)?; + Ok(Vortex::DownloadPiece(header, download_piece)) + } + tlv::Tag::PieceContent => { + let piece_content = tlv::piece_content::PieceContent::from_bytes(value)?; + Ok(Vortex::PieceContent(header, piece_content)) + } + tlv::Tag::Reserved(_) => Ok(Vortex::Reserved(header)), + tlv::Tag::Error => { + let err = tlv::error::Error::from_bytes(value)?; + Ok(Vortex::Error(header, err)) + } + } + } + + /// packet_id returns the packet identifier of the Vortex packet. + #[inline] + pub fn packet_id(&self) -> u8 { + match self { + Vortex::DownloadPiece(header, _) => header.packet_id, + Vortex::PieceContent(header, _) => header.packet_id, + Vortex::Reserved(header) => header.packet_id, + Vortex::Error(header, _) => header.packet_id, + } + } + + /// tag returns the tag of the Vortex packet. + #[inline] + pub fn tag(&self) -> &tlv::Tag { + match self { + Vortex::DownloadPiece(header, _) => &header.tag, + Vortex::PieceContent(header, _) => &header.tag, + Vortex::Reserved(header) => &header.tag, + Vortex::Error(header, _) => &header.tag, + } + } + + /// length returns the length of the value field. + #[inline] + pub fn length(&self) -> usize { + match self { + Vortex::DownloadPiece(header, _) => header.length, + Vortex::PieceContent(header, _) => header.length, + Vortex::Reserved(header) => header.length, + Vortex::Error(header, _) => header.length, + } + } + + /// from_bytes creates a Vortex packet from a byte slice. + pub fn from_bytes(bytes: Bytes) -> Result { + if bytes.len() < HEADER_SIZE { + return Err(Error::InvalidPacket(format!( + "expected min {HEADER_SIZE} bytes, got {}", + bytes.len() + ))); + } + + let mut bytes = BytesMut::from(bytes); + let header = bytes.split_to(HEADER_SIZE); + let value = bytes; + let packet_id = header[0]; + let tag = header[1] + .try_into() + .map_err(|err| Error::InvalidPacket(format!("invalid tag value: {:?}", err)))?; + let length = u32::from_be_bytes(header[2..HEADER_SIZE].try_into()?) as usize; + + // Check if the value length matches the specified length. + if value.len() != length { + return Err(Error::InvalidLength(format!( + "value len {} != declared length {}", + value.len(), + length + ))); + } + + let header = Header { + packet_id, + tag, + length, + }; + + match tag { + tlv::Tag::DownloadPiece => { + let download_piece = + tlv::download_piece::DownloadPiece::from_bytes(value.freeze())?; + Ok(Vortex::DownloadPiece(header, download_piece)) + } + tlv::Tag::PieceContent => { + let piece_content = tlv::piece_content::PieceContent::from_bytes(value.freeze())?; + Ok(Vortex::PieceContent(header, piece_content)) + } + tlv::Tag::Reserved(_) => Ok(Vortex::Reserved(header)), + tlv::Tag::Error => { + let error = tlv::error::Error::from_bytes(value.freeze())?; + Ok(Vortex::Error(header, error)) + } + } + } + + /// to_bytes converts the Vortex packet to a byte slice. + pub fn to_bytes(&self) -> bytes::Bytes { + let (header, value) = match self { + Vortex::DownloadPiece(header, download_piece) => (header, download_piece.to_bytes()), + Vortex::PieceContent(header, piece_content) => (header, piece_content.to_bytes()), + Vortex::Reserved(header) => (header, bytes::Bytes::new()), + Vortex::Error(header, err) => (header, err.to_bytes()), + }; + + let mut bytes = BytesMut::with_capacity(HEADER_SIZE + value.len()); + bytes.put_u8(header.packet_id); + bytes.put_u8(header.tag.into()); + bytes.put_u32(value.len() as u32); + bytes.extend_from_slice(&value); + bytes.freeze() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tlv::Tag; + use bytes::Bytes; + + #[test] + fn test_new_download_piece() { + let tag = Tag::DownloadPiece; + let value = Bytes::from("a".repeat(32) + "-42"); + let packet = Vortex::new(tag, value.clone()).expect("Failed to create Vortex packet"); + + assert_eq!(packet.packet_id(), packet.packet_id()); + assert_eq!(packet.tag(), &tag); + assert_eq!(packet.length(), value.len()); + } + + #[test] + fn test_new_piece_content() { + let tag = Tag::PieceContent; + let value = Bytes::from("Hello, world!"); + let packet = Vortex::new(tag, value.clone()).expect("Failed to create Vortex packet"); + + assert_eq!(packet.packet_id(), packet.packet_id()); + assert_eq!(packet.tag(), &tag); + assert_eq!(packet.length(), value.len()); + } + + #[test] + fn test_from_bytes() { + let tag = Tag::DownloadPiece; + let value = Bytes::from("a".repeat(32) + "-42"); + let packet = Vortex::new(tag, value.clone()).expect("Failed to create Vortex packet"); + let bytes = packet.to_bytes(); + let parsed_packet = + Vortex::from_bytes(bytes).expect("Failed to parse Vortex packet from bytes"); + + assert_eq!(parsed_packet.packet_id(), packet.packet_id()); + assert_eq!(parsed_packet.tag(), packet.tag()); + assert_eq!(parsed_packet.length(), packet.length()); + } + + #[test] + fn test_to_bytes() { + let tag = Tag::DownloadPiece; + let value = Bytes::from("a".repeat(32) + "-42"); + let packet = Vortex::new(tag, value.clone()).expect("Failed to create Vortex packet"); + let bytes = packet.to_bytes(); + let parsed_packet = + Vortex::from_bytes(bytes).expect("Failed to parse Vortex packet from bytes"); + + assert_eq!(parsed_packet.to_bytes(), packet.to_bytes()); + } + + #[test] + fn test_error_handling() { + let tag = Tag::Error; + let value = Bytes::from("1:Error message"); + let packet = Vortex::new(tag, value.clone()).expect("Failed to create Vortex packet"); + + assert_eq!(packet.tag(), &tag); + assert_eq!(packet.length(), value.len()); + } +} diff --git a/src/tlv/download_piece.rs b/src/tlv/download_piece.rs new file mode 100644 index 0000000..dab2450 --- /dev/null +++ b/src/tlv/download_piece.rs @@ -0,0 +1,162 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::error::{Error, Result}; +use bytes::{BufMut, Bytes, BytesMut}; + +/// SEPARATOR is the separator character used in the download piece request, separating the task ID +/// and piece number. It is a hyphen character '-'. +const SEPARATOR: u8 = b'-'; + +/// DOWNLOAD_PIECE_SIZE is the size of the download piece request, including the task ID, separator, +/// and piece number. +const DOWNLOAD_PIECE_SIZE: usize = 32 + 1 + 4; + +/// DownloadPiece represents a download piece request. +/// +/// Value Format: +/// - Task ID (32 bytes): SHA-256 hash of the task ID. +/// - Separator (1 byte): Separator character '-'. +/// - Piece Number (4 bytes): Piece number to download. +/// +/// ```text +/// -------------------------------------------------------------------- +/// | Task ID (32 bytes) | Separator (1 byte) | Piece Number (4 bytes) | +/// -------------------------------------------------------------------- +/// ``` +#[derive(Debug, Clone)] +pub struct DownloadPiece { + task_id: String, + piece_number: u32, +} + +/// DownloadPiece implements the DownloadPiece functions. +impl DownloadPiece { + /// new creates a new DownloadPiece request. + pub fn new(task_id: String, piece_number: u32) -> Self { + DownloadPiece { + task_id, + piece_number, + } + } + + /// task_id returns the task ID. + pub fn task_id(&self) -> &str { + &self.task_id + } + + /// piece_number returns the piece number. + pub fn piece_number(&self) -> u32 { + self.piece_number + } + + /// len returns the length of the download piece request. + pub fn len(&self) -> usize { + DOWNLOAD_PIECE_SIZE + } + + /// is_empty returns whether the download piece request is empty. + pub fn is_empty(&self) -> bool { + self.task_id.is_empty() + } + + /// from_bytes creates a download piece request from a byte slice. + pub fn from_bytes(bytes: Bytes) -> Result { + let mut parts = bytes.splitn(2, |&b| b == SEPARATOR); + let task_id = std::str::from_utf8( + parts + .next() + .ok_or(Error::InvalidPacket("missing task id".to_string()))?, + )? + .to_string(); + + let piece_number: u32 = std::str::from_utf8( + parts + .next() + .ok_or(Error::InvalidPacket("missing piece number".to_string()))?, + )? + .trim() + .parse()?; + + Ok(DownloadPiece { + task_id, + piece_number, + }) + } + + /// to_bytes converts the download piece request to a byte slice. + pub fn to_bytes(&self) -> Bytes { + let mut bytes = BytesMut::with_capacity(DOWNLOAD_PIECE_SIZE); + bytes.extend_from_slice(self.task_id.as_bytes()); + bytes.put_u8(SEPARATOR); + bytes.extend_from_slice(self.piece_number.to_string().as_bytes()); + bytes.freeze() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + + #[test] + fn test_new() { + let task_id = "a".repeat(32); + let piece_number = 42; + let download_piece = DownloadPiece::new(task_id.clone(), piece_number); + + assert_eq!(download_piece.task_id(), task_id); + assert_eq!(download_piece.piece_number(), piece_number); + assert_eq!(download_piece.len(), DOWNLOAD_PIECE_SIZE); + } + + #[test] + fn test_is_empty() { + let download_piece_empty = DownloadPiece::new("".to_string(), 0); + let download_piece_non_empty = DownloadPiece::new("a".repeat(32), 1); + + assert!(download_piece_empty.is_empty()); + assert!(!download_piece_non_empty.is_empty()); + } + + #[test] + fn test_to_bytes_and_from_bytes() { + let task_id = "a".repeat(32); + let piece_number = 42; + let download_piece = DownloadPiece::new(task_id.clone(), piece_number); + + let bytes = download_piece.to_bytes(); + let download_piece_decoded = DownloadPiece::from_bytes(bytes).unwrap(); + + assert_eq!(download_piece_decoded.task_id(), task_id); + assert_eq!(download_piece_decoded.piece_number(), piece_number); + } + + #[test] + fn test_from_bytes_invalid_input() { + // Test missing separator. + let invalid_bytes = Bytes::from("invalid_input_without_separator"); + assert!(DownloadPiece::from_bytes(invalid_bytes).is_err()); + + // Test missing piece number. + let invalid_bytes = Bytes::from(format!("{}-", "a".repeat(32))); + assert!(DownloadPiece::from_bytes(invalid_bytes).is_err()); + + // Test invalid piece number format. + let invalid_bytes = Bytes::from(format!("{}-invalid", "a".repeat(32))); + assert!(DownloadPiece::from_bytes(invalid_bytes).is_err()); + } +} diff --git a/src/tlv/error.rs b/src/tlv/error.rs new file mode 100644 index 0000000..dc51a32 --- /dev/null +++ b/src/tlv/error.rs @@ -0,0 +1,212 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::error::{Error as VortexError, Result as VortexResult}; +use bytes::{BufMut, Bytes, BytesMut}; + +/// SEPARATOR is the separator character used in the error request, separating the error code +/// and error message. It is a hyphen character ':'. +const SEPARATOR: u8 = b':'; + +/// Code represents a error code. +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Code { + /// Unknown error. + Unknown = 0, + + /// Invalid argument, such as an invalid task ID or piece number. + InvalidArgument = 1, + + /// Resource not found, such as a missing piece. + NotFound = 2, + + /// Internal error, such as an unexpected error. + Internal = 3, + + /// Reserved for future use. + Reserved(u8), // For tags 4-254 +} + +/// Implement TryFrom for Code. +impl TryFrom for Code { + type Error = (); + + /// Converts a u8 to a Code enum. + fn try_from(value: u8) -> std::result::Result { + match value { + 0 => Ok(Code::Unknown), + 1 => Ok(Code::InvalidArgument), + 2 => Ok(Code::NotFound), + 3 => Ok(Code::Internal), + 4..=255 => Ok(Code::Reserved(value)), + } + } +} + +/// Implement From for u8. +impl From for u8 { + /// Converts a Code enum to a u8. + fn from(code: Code) -> u8 { + match code { + Code::Unknown => 0, + Code::InvalidArgument => 1, + Code::NotFound => 2, + Code::Internal => 3, + Code::Reserved(value) => value, + } + } +} + +/// Error represents a error request. +/// +/// Value Format: +/// - Error Code (8 bits): Error code. +/// - Separator (1 byte): Separator character ':'. +/// - Error Message (variable): Error message. +/// +/// ```text +/// ---------------------------------------------------------------------- +/// | Error Code (8 bits) | Separator (1 byte) | Error Message | +/// ---------------------------------------------------------------------- +/// ``` +#[derive(Debug, Clone)] +pub struct Error { + code: Code, + message: String, +} + +/// Error implements the Error functions. +impl Error { + /// new creates a new Error request. + pub fn new(code: Code, message: String) -> Self { + Error { code, message } + } + + /// code returns the error code. + pub fn code(&self) -> Code { + self.code + } + + /// message returns the error message. + pub fn message(&self) -> &str { + &self.message + } + + /// len returns the length of the error request, including the error code, separator, and error + /// message. + pub fn len(&self) -> usize { + self.message.len() + 2 + } + + /// is_empty returns whether the error request is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// from_bytes creates a error request from a byte slice. + pub fn from_bytes(bytes: Bytes) -> VortexResult { + let mut parts = bytes.splitn(2, |&b| b == SEPARATOR); + + let code = parts + .next() + .ok_or(VortexError::InvalidPacket("missing error code".to_string()))?; + + if code.len() != 1 { + return Err(VortexError::InvalidPacket("invalid error code".to_string())); + } + + let code: Code = code + .first() + .copied() + .ok_or(VortexError::InvalidPacket("missing error code".to_string()))? + .try_into() + .unwrap(); + + let message = std::str::from_utf8(parts.next().ok_or(VortexError::InvalidPacket( + "missing error message".to_string(), + ))?)? + .to_string(); + + if message.is_empty() { + return Err(VortexError::InvalidPacket( + "empty error message".to_string(), + )); + } + + Ok(Error { code, message }) + } + + /// to_bytes converts the error request to a byte slice. + pub fn to_bytes(&self) -> Bytes { + let mut bytes = BytesMut::with_capacity(self.len()); + bytes.put_u8(self.code.into()); + bytes.put_u8(SEPARATOR); + bytes.extend_from_slice(self.message.as_bytes()); + bytes.freeze() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + + #[test] + fn test_new() { + let code = Code::InvalidArgument; + let message = "Invalid argument".to_string(); + let error = Error::new(code, message.clone()); + + assert_eq!(error.code(), code); + assert_eq!(error.message(), message); + assert_eq!(error.len(), message.len() + 2); + } + + #[test] + fn test_is_non_empty() { + let error_non_empty = Error::new(Code::Unknown, "Error message".to_string()); + assert!(!error_non_empty.is_empty()); + } + + #[test] + fn test_to_bytes_and_from_bytes() { + let code = Code::NotFound; + let message = "Resource not found".to_string(); + let error = Error::new(code, message.clone()); + + let bytes = error.to_bytes(); + let error_decoded = Error::from_bytes(bytes).unwrap(); + + assert_eq!(error_decoded.code(), code); + assert_eq!(error_decoded.message(), message); + } + + #[test] + fn test_from_bytes_invalid_input() { + // Test missing separator + let invalid_bytes = Bytes::from("invalid_input_without_separator"); + assert!(Error::from_bytes(invalid_bytes).is_err()); + + // Test missing error message + let invalid_bytes = Bytes::from(format!("{}:", 1)); + assert!(Error::from_bytes(invalid_bytes).is_err()); + + // Test invalid error code format + let invalid_bytes = Bytes::from(format!("{}:{}", 256, "Invalid code")); + assert!(Error::from_bytes(invalid_bytes).is_err()); + } +} diff --git a/src/tlv/mod.rs b/src/tlv/mod.rs new file mode 100644 index 0000000..a9caac4 --- /dev/null +++ b/src/tlv/mod.rs @@ -0,0 +1,88 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +pub mod download_piece; +pub mod error; +pub mod piece_content; + +/// Tag Definitions +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Tag { + /// Download the content of a piece from a peer. It is composed of `{Task ID}-{Piece ID}`, + /// where the Task ID is a 32-byte SHA-256 value and the Piece ID is a number. + DownloadPiece = 0, + + /// The content of a piece, with a maximum size of 4 GiB per piece. + PieceContent = 1, + + /// Reserved for future use, for tags 2-254. + Reserved(u8), + + /// Error message. + Error = 255, +} + +/// Implement TryFrom for Tag. +impl TryFrom for Tag { + type Error = (); + + /// Converts a u8 to a Tag enum. + fn try_from(value: u8) -> std::result::Result { + match value { + 0 => Ok(Tag::DownloadPiece), + 1 => Ok(Tag::PieceContent), + 2..=254 => Ok(Tag::Reserved(value)), + 255 => Ok(Tag::Error), + } + } +} + +/// Implement From for u8. +impl From for u8 { + /// Converts a Tag enum to a u8. + fn from(tag: Tag) -> Self { + match tag { + Tag::DownloadPiece => 0, + Tag::PieceContent => 1, + Tag::Reserved(value) => value, + Tag::Error => 255, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_try_from_u8() { + assert_eq!(Tag::try_from(0), Ok(Tag::DownloadPiece)); + assert_eq!(Tag::try_from(1), Ok(Tag::PieceContent)); + assert_eq!(Tag::try_from(2), Ok(Tag::Reserved(2))); + assert_eq!(Tag::try_from(254), Ok(Tag::Reserved(254))); + assert_eq!(Tag::try_from(255), Ok(Tag::Error)); + } + + #[test] + fn test_from_tag() { + assert_eq!(u8::from(Tag::DownloadPiece), 0); + assert_eq!(u8::from(Tag::PieceContent), 1); + assert_eq!(u8::from(Tag::Reserved(2)), 2); + assert_eq!(u8::from(Tag::Reserved(254)), 254); + assert_eq!(u8::from(Tag::Error), 255); + } +} diff --git a/src/tlv/piece_content.rs b/src/tlv/piece_content.rs new file mode 100644 index 0000000..20ff73a --- /dev/null +++ b/src/tlv/piece_content.rs @@ -0,0 +1,97 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::error::{Error, Result}; +use bytes::Bytes; + +/// PieceContent represents the content of a piece. +#[derive(Debug, Clone)] +pub struct PieceContent(bytes::Bytes); + +/// PieceContent implements the PieceContent functions. +impl PieceContent { + /// new creates a new PieceContent request. + pub fn new(content: Bytes) -> Self { + PieceContent(content) + } + + /// len returns the length of the piece content request. + pub fn len(&self) -> usize { + self.0.len() + } + + /// is_empty returns whether the piece content request is empty. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// from_bytes creates a piece content request from a byte slice. + pub fn from_bytes(bytes: Bytes) -> Result { + if bytes.is_empty() { + return Err(Error::InvalidLength("piece content is empty".to_string())); + } + + Ok(PieceContent(bytes)) + } + + /// to_bytes converts the piece content request to a byte slice. + pub fn to_bytes(&self) -> bytes::Bytes { + self.0.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + + #[test] + fn test_new() { + let content = Bytes::from("test content"); + let piece_content = PieceContent::new(content.clone()); + + assert_eq!(piece_content.len(), content.len()); + assert_eq!(piece_content.to_bytes(), content); + } + + #[test] + fn test_is_empty() { + let empty_content = PieceContent::new(Bytes::new()); + let non_empty_content = PieceContent::new(Bytes::from("test content")); + + assert!(empty_content.is_empty()); + assert!(!non_empty_content.is_empty()); + } + + #[test] + fn test_to_bytes_and_from_bytes() { + let content = Bytes::from("test content"); + let piece_content = PieceContent::new(content.clone()); + + let bytes = piece_content.to_bytes(); + let piece_content_decoded = PieceContent::from_bytes(bytes).unwrap(); + + assert_eq!(piece_content_decoded.len(), content.len()); + assert_eq!(piece_content_decoded.to_bytes(), content); + } + + #[test] + fn test_from_bytes_invalid_input() { + // Test empty content. + let empty_bytes = Bytes::new(); + assert!(PieceContent::from_bytes(empty_bytes).is_err()); + } +}