diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..4e6ff90 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,49 @@ +[package] +name = "alvarium-sdk-rust" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[features] +default = ["rustls"] +native-tls = ["dep:native-tls"] +rustls = ["dep:rustls", "webpki-roots"] + +[dependencies] +tokio = { version = "1.32.0", features = ["rt", "macros"] } +chrono = "0.4.22" +lazy_static = "1.4.0" + +md5-rs = "0.1.5" +hex = "0.4.3" +base64 = "0.13.0" + +alvarium-annotator = { git = "https://github.com/project-alvarium/alvarium-annotator" } +streams = { git = "https://github.com/demia-protocol/streams", branch = "develop", default-features = false, features = ["utangle-client", "did"] } +iota-crypto = { git = "https://github.com/iotaledger/crypto.rs", rev = "f6f88fc", features = ["ed25519", "sha", "random"]} + +serde = "1.0.143" +serde_json = "1.0.83" +gethostname = "0.2.3" + +rand = { version = "0.8.5" } +ulid = "1.0.0" + +reqwest = { version = "0.11.11", default-features = false, features = ["json", "rustls-tls"]} +futures = {version = "0.3.8", default-features = false} +async-trait = "0.1.57" + +rustls = { version = "0.21.2", optional = true } +rustls-pemfile = "1.0.2" +native-tls = { version = "0.2.11", optional = true } +webpki-roots = { version = "0.23.1", optional = true } + +rumqttc = "0.22.0" + +thiserror = "1.0.40" +log = "0.4" +fern = "0.6.2" + +[target.'cfg(unix)'.dependencies] +libc = "0.2.146" diff --git a/README.md b/README.md index 75aca08..f01a8df 100644 --- a/README.md +++ b/README.md @@ -1 +1,244 @@ -# alvarium-sdk-rust \ No newline at end of file +# Alvarium Rust SDK + +This is an implementation of the Alvarium SDK in Rust. It provides parity with the +[go implementation](https://github.com/project-alvarium/alvarium-sdk-go). + + +Additionally, it implements the traits and core providers from the +[alvarium-annotator](https://github.com/project-alvarium/alvarium-annotator) library, +making up the core implementation. + + +## Usage +The first thing you will need to do is set up your configuration file. An example file can be +found [here](resources/test_config.json). You can copy this file and update with the appropriate +provider details. + +Examples of stream provider configurations can be found [here](resources/mqtt_stream_config.json) +for mqtt and [here](resources/demia_streams_config.json) for a Demia (powered by IOTA) Streams provider. + + +To include the rust sdk in your project insert the following into your Cargo.toml file + +``` +[dependencies] +alvarium-annotator = { git = "https://github.com/project-alvarium/AlvariumAnnotator" } +alvarium-rust-sdk = { git = "https://github.com/project-alvarium/alvarium-rust-sdk" } +``` + +To use the sdk, you will also need to include an asynchronous runtime environment such as +[tokio](https://github.com/tokio-rs/tokio). + +``` +[dependencies] +tokio = "1.35.1" +``` + +Then you can get started using the sdk itself. + +```rust +// This is the main sdk implementation +use alvarium_rust_sdk::sdk::SDK; +// You can use these factories to generate the core sdk annotators and signature providers +use alvarium_rust_sdk::factories::{new_annotator, new_signature_provider}; +// This is where you will find configuration breakdowns for deserialisation +use alvarium_rust_sdk::config::{self, SdkInfo, StreamConfig}; +// Here you can find network stream providers +use alvarium_rust_sdk::providers::stream_provider::{DemiaPublisher, MqttPublisher}; + + +#[macro_use] +extern crate lazy_static; +extern crate core; +// Creates a static CONFIG_BYTES value from the ./config.json file if it exists +lazy_static! { + pub static ref CONFIG_BYTES: Vec = { + match std::fs::read("config/config.json") { + Ok(config_bytes) => config_bytes, + Err(_) => vec![] + } + }; +} + +#[tokio::main] +async fn main() { + // Get configurations from the static configuration bytes + let sdk_info: SdkInfo = serde_json::from_slice(CONFIG_BYTES.as_slice())?; + // Prepare the signature provider + let signature_provider = new_signature_provider( &sdk_info.signature)?; + + // Create a vector of annotators for the alvarium sdk instance + let mut annotators: Vec + '_>> = Vec::new(); + for ann in &sdk_info.annotators { + // generate a new annotator from the sdk factory + annotators.push(new_annotator(ann.clone(), sdk_info.clone())?); + } + + // Create the alvarium SDK instance to annotate sensor data + let mut sdk: SDK<'_, IotaPublisher> = SDK::new(sdk_info, annotators.as_mut_slice()).await?; + + // Source your data + let arbitrary_data = "Some data to send".as_bytes(); + + // For PKI annotators, data should be wrapped in a Signable wrapper + let sig = signature_provider.sign(&serde_json::to_vec(arbitrary_data)?)?; + let data = Signable::new(serde_json::to_string(&arbitrary_data)?, sig); + + // New data creation annotation + sdk.create(data.to_bytes().as_slice()).await?; +} +``` + + + +#### Custom Annotators +Annotations are designed to provide universally accepted metadata for various interactions with data +along its lifecycle. Currently, there are 4 annotator types provided through the core sdk: +Tpm confirmation, Tls usage confirmation, Source annotation, and Pki verification. These help to set a +foundation of annotations that will be provided, but this does not serve all the possible annotatable +use cases that one might need for an application/project. In order to accommodate that, the concept of an +Annotator has been abstracted to an interface (trait), so that custom annotators can be developed and used +within the Sdk. + +An example implementation of this would be as follows + +```rust +use alvarium_annotator::{ + Annotation, Annotator, constants, derive_hash, serialise_and_sign, + constants::AnnotationType, +}; +use alvarium_rust_sdk::{ + config::{self, Signable}, + factories::{new_hash_provider, new_signature_provider}, + providers::sign_provider::SignatureProviderWrap +}; + +/// Defines a new annotator type that will implement the Annotator trait +pub struct ThresholdAnnotator { + /// Hashing algorithm used for checksums + hash: constants::HashType, + /// Type of annotation (a wrapper around a string definition) + kind: AnnotationType, + /// Signature provider for signing data + sign: SignatureProviderWrap, + /// Threshold limits for custom annotation + range: Range, +} + +impl ThresholdAnnotator { + pub fn new(cfg: &config::SdkInfo, range: Range) -> Result> { + Ok(ThresholdAnnotator { + hash: cfg.hash.hash_type.clone(), + kind: AnnotationType("threshold".to_string()), + sign: new_signature_provider(&cfg.signature)?, + range, + }) + } + +} + +/// Implementation of the annotate() function for generating a threshold Annotation +impl Annotator for ThresholdAnnotator { + type Error = alvarium_rust_sdk::errors::Error; + fn annotate(&mut self, data: `&[u8]`) -> alvarium_rust_sdk::errors::Result { + let hasher = new_hash_provider(&self.hash)?; + let signable: Signable = serde_json::from_slice(data)?; + let key = derive_hash(hasher, signable.seed.as_bytes()); + + match gethostname::gethostname().to_str() { + Some(host) => { + let reading: std::result::Result = serde_json::from_slice(data); + let within_threshold = match reading { + Ok(reading) => { + let reading: SensorReading = serde_json::from_str(&signable.seed).unwrap(); + reading.value <= self.range.end && reading.value >= self.range.start + }, + Err(_) => false + }; + + let mut annotation = Annotation::new(&key, self.hash.clone(), host, self.kind.clone(), within_threshold); + let signature = serialise_and_sign(&self.sign, &annotation)?; + annotation.with_signature(&signature); + Ok(annotation) + }, + None => { + Err(alvarium_rust_sdk::errors::Error::NoHostName.into()) + } + } + } +} +``` + +#### Custom Stream Providers +The base SDK includes an Mqtt and Demia Streams provider, but new stream providers can be created +using the alvarium-annotator [Publisher](https://github.com/project-alvarium/alvarium-annotator/blob/main/src/providers.rs#L124) +trait. So long as this trait is implemented, any custom streaming layer provider will be compatible +with the SDK. + +You can see the localised implementations for the Mqtt and Demia providers [here](src/providers/stream_provider/mqtt.rs) +and [here](src/providers/stream_provider/demia.rs). + + +### API +The SDK provides a simple API for generating annotations dependent on the actions being taken. + +NewSdk(), Create(), Mutate(), Transit(), Publish() and BootstrapHandler(). + +#### SDK::new() + +```rust +pub async fn new(cfg: SdkInfo, annotators: &'a mut [Box]) -> crate::errors::Result> +``` + +Used to instantiate a new SDK instance with the specified list of annotators. + +Takes a list of annotators, and a populated configuration. Returns an SDK instance. + +#### Create() + +```rust + pub async fn create(&mut self, data: `&[u8]`) -> crate::errors::Result<()> +``` + +Used to register creation of new data with the SDK. Passes data through the SDK instance's list of annotators. + +##### Parameters +- data: `&[u8]` -- The data being handled represented as a byte array + + +#### Mutate() + +```rust +pub async fn mutate(&mut self, old: &[u8], new: &[u8]) -> crate::errors::Result<()> +``` + +Used to register mutation of existing data with the SDK. Passes data through the SDK instance's list of annotators. + +##### Parameters +- old: &[u8] -- The source data item that is being modified, represented as a byte array +- new: &[u8] -- The new data item resulting from the change, represented as a byte array + +Calling this method will link the old data to the new in a lineage. Specific annotations will be applied to the `new` data element. + +#### Transit() + +```rust +pub async fn transit(&mut self, data: `&[u8]`) -> crate::errors::Result<()> +``` + +Used to annotate data that is neither originated or modified but simply handed from one application to another. + +##### Parameters +- data: `&[u8]` -- The data being handled represented as a byte array + + +#### Publish() + +```rust +pub async fn publish(&mut self, data: `&[u8]`) -> crate::errors::Result<()> +``` + +Used to annotate data that is neither originated or modified but **before** being handed to another application. + +##### Parameters +- data: `&[u8]` -- The data being handled represented as a byte array diff --git a/resources/demia_streams_config.json b/resources/demia_streams_config.json new file mode 100644 index 0000000..62f6e04 --- /dev/null +++ b/resources/demia_streams_config.json @@ -0,0 +1,21 @@ +{ + "type": "demia", + "config": { + "provider": { + "host": "localhost", + "protocol": "http", + "port": 8080 + }, + "tangle": { + "host": "nodes.02.demia-testing-domain.com", + "protocol": "https", + "port": 443 + }, + "encoding": "utf-8", + "topic": "Publisher Unique Topic", + "backup": { + "path": "temp_file", + "password": "password" + } + } +} \ No newline at end of file diff --git a/resources/mqtt_stream_config.json b/resources/mqtt_stream_config.json new file mode 100644 index 0000000..d6bd7ae --- /dev/null +++ b/resources/mqtt_stream_config.json @@ -0,0 +1,21 @@ +{ + "type": "mqtt", + "config": { + "clientId": "A client ID", + "keepAlive": 5, + "boundedCap": 100, + "qos": 1, + "user": "A user ID", + "password": "Stream Password", + "provider": { + "host": "test.mosquitto.org", + "port": 8883, + "protocol": "tls" + }, + "cleanness": false, + "topics": [ + "Topic1", + "Topic2" + ] + } +} \ No newline at end of file diff --git a/resources/test_config.json b/resources/test_config.json new file mode 100644 index 0000000..8aa1fbd --- /dev/null +++ b/resources/test_config.json @@ -0,0 +1,45 @@ +{ + "annotators": [ + "pki", + "source", + "tls" + ], + "hash": { + "type": "sha256" + }, + "signature": { + "public": { + "type": "ed25519", + "path": "./resources/test_keys/public.key" + }, + "private": { + "type": "ed25519", + "path": "./resources/test_keys/private.key" + } + }, + "stream": { + "type": "demia", + "config": { + "provider": { + "host": "localhost", + "protocol": "http", + "port": 8080 + }, + "tangle": { + "host": "nodes.02.demia-testing-domain.com", + "protocol": "http", + "port": 14102 + }, + "encoding": "utf-8", + "topic": "Publisher Unique Topic", + "backup": { + "path": "temp_file", + "password": "password" + } + } + }, + "logging": { + "level": "info", + "debug_output": "output.log" + } +} \ No newline at end of file diff --git a/resources/test_keys/private.key b/resources/test_keys/private.key new file mode 100644 index 0000000..d2cbe54 --- /dev/null +++ b/resources/test_keys/private.key @@ -0,0 +1 @@ +f2068f3956ce804ed792a3a90617692827b960d83cba56b9ca59f910f7a5f70e \ No newline at end of file diff --git a/resources/test_keys/public.key b/resources/test_keys/public.key new file mode 100644 index 0000000..abc9413 --- /dev/null +++ b/resources/test_keys/public.key @@ -0,0 +1 @@ +3dd5d326bbd881172ce74f1979d7159b5f86e7c50ba5dca08db2ded78afb2e23 \ No newline at end of file diff --git a/src/annotations/annotators/mod.rs b/src/annotations/annotators/mod.rs new file mode 100644 index 0000000..4b30a6d --- /dev/null +++ b/src/annotations/annotators/mod.rs @@ -0,0 +1,17 @@ +#![allow(clippy::new_ret_no_self)] +mod pki; +mod source; +mod tls; +mod tpm; + +pub use pki::*; +pub use source::*; +pub use tls::*; +pub use tpm::*; + + +#[test] +fn unknown_annotator_failure() { + let ann_type = alvarium_annotator::constants::AnnotationType::try_from("unknown"); + assert!(ann_type.is_err()) +} \ No newline at end of file diff --git a/src/annotations/annotators/pki.rs b/src/annotations/annotators/pki.rs new file mode 100644 index 0000000..c477c5b --- /dev/null +++ b/src/annotations/annotators/pki.rs @@ -0,0 +1,127 @@ +use crate::annotations::{ + Annotation, + Annotator, + constants, +}; +use crate::{config::{self, Signable}}; +use crate::providers::sign_provider::SignatureProviderWrap; +use alvarium_annotator::{derive_hash, serialise_and_sign}; +use crate::factories::{new_hash_provider, new_signature_provider}; +use crate::errors::{Result, Error}; + +pub struct PkiAnnotator { + hash: constants::HashType, + kind: constants::AnnotationType, + sign: SignatureProviderWrap, +} + +impl PkiAnnotator { + pub fn new(cfg: &config::SdkInfo) -> Result> { + Ok(PkiAnnotator { + hash: cfg.hash.hash_type.clone(), + kind: constants::ANNOTATION_PKI.clone(), + sign: new_signature_provider(&cfg.signature)?, + }) + } +} + +impl Annotator for PkiAnnotator { + type Error = crate::errors::Error; + fn annotate(&mut self, data: &[u8]) -> Result { + let hasher = new_hash_provider(&self.hash)?; + let signable: std::result::Result = serde_json::from_slice(data); + let (verified, key) = match signable { + Ok(signable) => { + let key = derive_hash(hasher, signable.seed.as_bytes()); + (signable.verify_signature(&self.sign)?, key) + }, + Err(_) => (false, derive_hash(hasher, data)), + }; + match gethostname::gethostname().to_str() { + Some(host) => { + let mut annotation = Annotation::new(&key, self.hash.clone(), host, self.kind.clone(), verified); + let signature = serialise_and_sign(&self.sign, &annotation)?; + annotation.with_signature(&signature); + Ok(annotation) + }, + None => Err(Error::NoHostName) + } + } +} + + +#[cfg(test)] +mod pki_tests { + use log::info; + use crate::{config, providers::sign_provider::get_priv_key}; + use crate::annotations::{Annotator, PkiAnnotator, constants}; + use crate::config::Signable; + + #[test] + fn valid_and_invalid_pki_annotator() { + let config: config::SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + + let mut config2 = config.clone(); + config2.hash.hash_type = constants::HashType("Not a known hash type".to_string()); + + let data = String::from("Some random data"); + let sig = hex::encode([0u8; crypto::signatures::ed25519::SIGNATURE_LENGTH]); + + let signable = Signable::new(data, sig); + let serialised = serde_json::to_vec(&signable).unwrap(); + + let mut pki_annotator_1 = PkiAnnotator::new(&config).unwrap(); + let mut pki_annotator_2 = PkiAnnotator::new(&config2).unwrap(); + + let valid_annotation = pki_annotator_1.annotate(&serialised).unwrap(); + let invalid_annotation = pki_annotator_2.annotate(&serialised); + + assert!(valid_annotation.validate_base()); + assert!(invalid_annotation.is_err()); + } + + + #[test] + fn make_pki_annotation() { + let config: config::SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + + info!("config {}", config.signature.private_key_info.path); + let data = String::from("Some random data"); + let priv_key_file = std::fs::read(&config.signature.private_key_info.path).unwrap(); + let priv_key_string = String::from_utf8(priv_key_file).unwrap(); + let priv_key = get_priv_key(&priv_key_string).unwrap(); + let sig = priv_key.sign(data.as_bytes()); + + let signable = Signable::new(data, hex::encode(sig.to_bytes())); + let serialised = serde_json::to_vec(&signable).unwrap(); + + let mut pki_annotator = PkiAnnotator::new(&config).unwrap(); + let annotation = pki_annotator.annotate(&serialised).unwrap(); + + assert!(annotation.validate_base()); + assert_eq!(annotation.kind, *constants::ANNOTATION_PKI); + assert_eq!(annotation.host, gethostname::gethostname().to_str().unwrap()); + assert_eq!(annotation.hash, config.hash.hash_type); + assert!(annotation.is_satisfied) + } + + #[test] + fn unsatisfied_pki_annotation() { + let config: config::SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + + let data = String::from("Some random data"); + let sig = hex::encode([0u8; crypto::signatures::ed25519::SIGNATURE_LENGTH]); + + let signable = Signable::new(data, sig); + let serialised = serde_json::to_vec(&signable).unwrap(); + + let mut pki_annotator = PkiAnnotator::new(&config).unwrap(); + let annotation = pki_annotator.annotate(&serialised).unwrap(); + + assert!(annotation.validate_base()); + assert_eq!(annotation.kind, *constants::ANNOTATION_PKI); + assert_eq!(annotation.host, gethostname::gethostname().to_str().unwrap()); + assert_eq!(annotation.hash, config.hash.hash_type); + assert!(!annotation.is_satisfied) + } +} \ No newline at end of file diff --git a/src/annotations/annotators/source.rs b/src/annotations/annotators/source.rs new file mode 100644 index 0000000..da37806 --- /dev/null +++ b/src/annotations/annotators/source.rs @@ -0,0 +1,102 @@ +use crate::annotations::{ + Annotation, + Annotator, + constants, +}; +use crate::config; +use alvarium_annotator::{derive_hash, serialise_and_sign}; +use crate::config::Signable; +use crate::factories::{new_hash_provider, new_signature_provider}; +use crate::providers::sign_provider::SignatureProviderWrap; +use crate::errors::{Error, Result}; + +pub struct SourceAnnotator { + hash: constants::HashType, + kind: constants::AnnotationType, + sign: SignatureProviderWrap, +} + +impl SourceAnnotator { + pub fn new(cfg: &config::SdkInfo) -> Result> { + Ok(SourceAnnotator { + hash: cfg.hash.hash_type.clone(), + kind: constants::ANNOTATION_SOURCE.clone(), + sign: new_signature_provider(&cfg.signature)?, + }) + } +} + +impl Annotator for SourceAnnotator { + type Error = crate::errors::Error; + fn annotate(&mut self, data: &[u8]) -> Result { + let hasher = new_hash_provider(&self.hash)?; + let signable: std::result::Result = serde_json::from_slice(data); + let key = match signable { + Ok(signable) => derive_hash(hasher, signable.seed.as_bytes()), + Err(_) => derive_hash(hasher, data), + }; + match gethostname::gethostname().to_str() { + Some(host) => { + let mut annotation = Annotation::new(&key, self.hash.clone(), host, self.kind.clone(), true); + let signature = serialise_and_sign(&self.sign, &annotation)?; + annotation.with_signature(&signature); + Ok(annotation) + }, + None => Err(Error::NoHostName) + } + } +} + + +#[cfg(test)] +mod source_tests { + use crate::{config, providers::sign_provider::get_priv_key}; + use crate::annotations::{Annotator, constants, SourceAnnotator}; + use crate::config::Signable; + + #[test] + fn valid_and_invalid_source_annotator() { + let config: config::SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + + let mut config2 = config.clone(); + config2.hash.hash_type = constants::HashType("Not a known hash type".to_string()); + + let data = String::from("Some random data"); + let sig = hex::encode([0u8; crypto::signatures::ed25519::SIGNATURE_LENGTH]); + + let signable = Signable::new(data, sig); + let serialised = serde_json::to_vec(&signable).unwrap(); + + let mut source_annotator_1 = SourceAnnotator::new(&config).unwrap(); + let mut source_annotator_2 = SourceAnnotator::new(&config2).unwrap(); + let valid_annotation = source_annotator_1.annotate(&serialised).unwrap(); + let invalid_annotation = source_annotator_2.annotate(&serialised); + + assert!(valid_annotation.validate_base()); + assert!(invalid_annotation.is_err()); + } + + + #[test] + fn make_source_annotation() { + let config: config::SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + + let data = String::from("Some random data"); + let priv_key_file = std::fs::read(&config.signature.private_key_info.path).unwrap(); + let priv_key_string = String::from_utf8(priv_key_file).unwrap(); + let priv_key = get_priv_key(&priv_key_string).unwrap(); + let sig = priv_key.sign(data.as_bytes()); + + let signable = Signable::new(data, hex::encode(sig.to_bytes())); + let serialised = serde_json::to_vec(&signable).unwrap(); + + let mut source_annotator = SourceAnnotator::new(&config).unwrap(); + let annotation = source_annotator.annotate(&serialised).unwrap(); + + assert!(annotation.validate_base()); + assert_eq!(annotation.kind, *constants::ANNOTATION_SOURCE); + assert_eq!(annotation.host, gethostname::gethostname().to_str().unwrap()); + assert_eq!(annotation.hash, config.hash.hash_type); + assert!(annotation.is_satisfied) + } +} \ No newline at end of file diff --git a/src/annotations/annotators/tls.rs b/src/annotations/annotators/tls.rs new file mode 100644 index 0000000..3f730ca --- /dev/null +++ b/src/annotations/annotators/tls.rs @@ -0,0 +1,281 @@ +#[cfg(feature = "rustls")] +use std::io::Read; +use crate::annotations::{Annotation, Annotator, constants}; +use crate::config; +use crate::errors::{Result, Error}; +use alvarium_annotator::{derive_hash, serialise_and_sign}; + + +#[cfg(feature = "native-tls")] +use native_tls::TlsStream; +#[cfg(feature = "rustls")] +use rustls::Connection; +use std::net::TcpStream; +#[cfg(feature = "native-tls")] +use std::sync::Mutex; +use log::info; +use crate::config::Signable; +use crate::factories::{new_hash_provider, new_signature_provider}; +use crate::providers::sign_provider::SignatureProviderWrap; + +pub struct TlsAnnotator{ + hash: constants::HashType, + kind: constants::AnnotationType, + sign: SignatureProviderWrap, + + // TODO: Make type for this + #[cfg(feature = "native-tls")] + conn_native: Option>>, + + #[cfg(feature = "rustls")] + conn_rustls: Option, + #[cfg(feature = "rustls")] + stream: Option, +} + +impl TlsAnnotator { + pub fn new(cfg: &config::SdkInfo) -> Result + Tls> { + Ok(TlsAnnotator { + hash: cfg.hash.hash_type.clone(), + kind: constants::ANNOTATION_TLS.clone(), + sign: new_signature_provider(&cfg.signature)?, + #[cfg(feature = "native-tls")] + conn_native: None, + #[cfg(feature = "rustls")] + conn_rustls: None, + #[cfg(feature = "rustls")] + stream: None, + }) + } +} + +pub trait Tls { + #[cfg(feature = "native-tls")] + fn set_connection_native(&mut self, tls_stream: TlsStream); + #[cfg(feature = "rustls")] + fn set_connection_rustls(&mut self, conn: Connection, stream: TcpStream); + + #[cfg(feature = "native-tls")] + fn check_tls_stream_native(&self) -> bool; + #[cfg(feature = "rustls")] + fn check_tls_stream_rustls(&mut self) -> bool; +} + +impl Tls for TlsAnnotator { + #[cfg(feature = "native-tls")] + fn set_connection_native(&mut self, tls_stream: TlsStream) { + self.conn_native = Some(Mutex::new(tls_stream)); + } + + #[cfg(feature = "rustls")] + fn set_connection_rustls(&mut self, conn: Connection, stream: TcpStream) { + self.stream = Some(stream); + self.conn_rustls = Some(conn); + } + + #[cfg(feature = "native-tls")] + fn check_tls_stream_native(&self) -> bool { + match &self.conn_native { + Some(conn) => conn.lock().unwrap().peer_certificate().is_ok(), + None => false + } + } + + #[cfg(feature = "rustls")] + fn check_tls_stream_rustls(&mut self) -> bool { + if let Some(stream) = self.stream.as_mut() { + info!("Stream exists"); + if let Some(conn) = self.conn_rustls.as_mut() { + info!("Connection exists"); + let mut buf = [0; 1024]; + let mut retries = 0; + + loop { + if conn.wants_write() { + conn.write_tls(stream).unwrap(); + } + + if conn.wants_read() { + if stream.read(&mut buf).unwrap() == 0 { + break; + } + conn.read_tls(stream).unwrap(); + } + + if !conn.is_handshaking() { + break; + } else { + retries += 1; + if retries == 5 { + return false + } + } + } + + let mut buffer = [0; 1]; + match stream.peek(&mut buffer) { + Ok(_) => true, + Err(e) => { + match e.kind() { + std::io::ErrorKind::WouldBlock => true, + std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::BrokenPipe => { + info!("Connection error in TLS stream: {:?}", e); + false + }, + _ => { + info!("Unexpected error in TLS stream: {:?}", e); + false + } + } + } + } + } else { + false + } + } else { + false + } + } + +} + + +// Create a TLS Server Connection instance to determine if it is being used +impl Annotator for TlsAnnotator { + type Error = crate::errors::Error; + fn annotate(&mut self, data: &[u8]) -> Result { + let hasher = new_hash_provider(&self.hash)?; + let signable: std::result::Result = serde_json::from_slice(data); + let key = match signable { + Ok(signable) => derive_hash(hasher, signable.seed.as_bytes()), + Err(_) => derive_hash(hasher, data), + }; + match gethostname::gethostname().to_str() { + Some(host) => { + #[cfg(all(not(feature = "rustls"), feature = "native-tls"))] + let is_satisfied = self.check_tls_stream_native(); + #[cfg(feature = "rustls")] + let is_satisfied = self.check_tls_stream_rustls(); + + let mut annotation = Annotation::new(&key, self.hash.clone(), host, self.kind.clone(), is_satisfied); + let signature = serialise_and_sign(&self.sign, &annotation)?; + annotation.with_signature(&signature); + Ok(annotation) + }, + None => Err(Error::NoHostName) + } + } +} + + +#[cfg(test)] +mod tls_tests { + use std::sync::Arc; + use rustls::ClientConnection; + use std::net::TcpStream; + use crate::{config, providers::sign_provider::get_priv_key}; + use crate::annotations::{Annotator, constants, TlsAnnotator}; + use crate::config::Signable; + #[cfg(feature = "rustls")] + use super::Tls; + + #[test] + fn valid_and_invalid_tls_annotator() { + let config: config::SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + + let mut config2 = config.clone(); + config2.hash.hash_type = constants::HashType("Not a known hash type".to_string()); + + let data = String::from("Some random data"); + let sig = hex::encode([0u8; crypto::signatures::ed25519::SIGNATURE_LENGTH]); + + let signable = Signable::new(data, sig); + let serialised = serde_json::to_vec(&signable).unwrap(); + + let mut tls_annotator_1 = TlsAnnotator::new(&config).unwrap(); + let mut tls_annotator_2 = TlsAnnotator::new(&config2).unwrap(); + + let valid_annotation = tls_annotator_1.annotate(&serialised).unwrap(); + let invalid_annotation = tls_annotator_2.annotate(&serialised); + + assert!(valid_annotation.validate_base()); + assert!(invalid_annotation.is_err()); + } + + #[cfg(feature = "rustls")] + #[test] + fn make_tls_annotation() { + let config: config::SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + + let data = String::from("Some random data"); + let priv_key_file = std::fs::read(&config.signature.private_key_info.path).unwrap(); + let priv_key_string = String::from_utf8(priv_key_file).unwrap(); + let priv_key = get_priv_key(&priv_key_string).unwrap(); + let sig = priv_key.sign(data.as_bytes()); + + let signable = Signable::new(data, hex::encode(sig.to_bytes())); + let serialised = serde_json::to_vec(&signable).unwrap(); + + let mut tls_annotator = TlsAnnotator::new(&config).unwrap(); + + let conn = make_client_connection().unwrap(); + let tcp_stream = TcpStream::connect("www.google.com:443").unwrap(); + tls_annotator.set_connection_rustls(conn.into(), tcp_stream); + + let annotation = tls_annotator.annotate(&serialised).unwrap(); + + assert!(annotation.validate_base()); + assert_eq!(annotation.kind, *constants::ANNOTATION_TLS); + assert_eq!(annotation.host, gethostname::gethostname().to_str().unwrap()); + assert_eq!(annotation.hash, config.hash.hash_type); + assert!(annotation.is_satisfied) + } + + #[test] + fn unsatisfied_tls_annotation() { + let config: config::SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + + let data = String::from("Some random data"); + let sig = hex::encode([0u8; crypto::signatures::ed25519::SIGNATURE_LENGTH]); + + let signable = Signable::new(data, sig); + let serialised = serde_json::to_vec(&signable).unwrap(); + + let mut tls_annotator = TlsAnnotator::new(&config).unwrap(); + let annotation = tls_annotator.annotate(&serialised).unwrap(); + + assert!(annotation.validate_base()); + assert_eq!(annotation.kind, *constants::ANNOTATION_TLS); + assert_eq!(annotation.host, gethostname::gethostname().to_str().unwrap()); + assert_eq!(annotation.hash, config.hash.hash_type); + assert!(!annotation.is_satisfied) + } + + #[cfg(feature = "rustls")] + fn make_client_connection() -> Result> { + let mut root_store = rustls::RootCertStore::empty(); + root_store.add_trust_anchors( + webpki_roots::TLS_SERVER_ROOTS + .0 + .iter() + .map(|ta| { + rustls::OwnedTrustAnchor::from_subject_spki_name_constraints( + ta.subject, + ta.spki, + ta.name_constraints, + ) + }) + ); + let config = rustls::ClientConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_protocol_versions(&[&rustls::version::TLS13]) + .unwrap() + .with_root_certificates(root_store) + .with_no_client_auth(); + + let server_name = "www.google.com".try_into().unwrap(); + Ok(ClientConnection::new(Arc::new(config), server_name).unwrap()) + } + +} \ No newline at end of file diff --git a/src/annotations/annotators/tpm.rs b/src/annotations/annotators/tpm.rs new file mode 100644 index 0000000..60e76c0 --- /dev/null +++ b/src/annotations/annotators/tpm.rs @@ -0,0 +1,156 @@ +use crate::annotations::{Annotation, Annotator, constants}; +use crate::config; +use crate::errors::{Result, Error}; +use alvarium_annotator::{derive_hash, serialise_and_sign}; + + +#[cfg(unix)] +use std::os::linux::fs::MetadataExt; +#[cfg(windows)] +use std::os::windows::fs::MetadataExt; +use crate::config::Signable; +use crate::factories::{new_hash_provider, new_signature_provider}; +use crate::providers::sign_provider::SignatureProviderWrap; + +const UNIX_TPM_PATH: &str = "/dev/tpm0"; // Adjust the path as needed + +pub struct TpmAnnotator { + hash: constants::HashType, + kind: constants::AnnotationType, + sign: SignatureProviderWrap, +} + +impl TpmAnnotator { + pub fn new(cfg: &config::SdkInfo) -> Result> { + Ok(TpmAnnotator { + hash: cfg.hash.hash_type.clone(), + kind: constants::ANNOTATION_TPM.clone(), + sign: new_signature_provider(&cfg.signature)?, + }) + } + + #[cfg(windows)] + fn check_tpm_presence_windows() -> bool { + let output = std::process::Command::new("tpmtool") + .arg("getdeviceinformation") + .output(); + + match output { + Ok(output) => { + // Check if the tpmtool command executed successfully and contains "TPM Present" + output.status.success() && String::from_utf8_lossy(&output.stdout).contains("TPM Present: Yes") + } + Err(_) => false, + } + } + + #[cfg(unix)] + fn check_tpm_presence_unix(&self) -> bool { + match std::fs::metadata(UNIX_TPM_PATH) { + Ok(metadata) => { + let file_type = metadata.st_mode() & libc::S_IFMT; + file_type == libc::S_IFCHR || file_type == libc::S_IFSOCK + }, + Err(_) => false, + } + } + + +} + +impl Annotator for TpmAnnotator { + type Error = crate::errors::Error; + fn annotate(&mut self, data: &[u8]) -> Result { + let hasher = new_hash_provider(&self.hash)?; + let signable: std::result::Result = serde_json::from_slice(data); + let key = match signable { + Ok(signable) => derive_hash(hasher, signable.seed.as_bytes()), + Err(_) => derive_hash(hasher, data), + }; + match gethostname::gethostname().to_str() { + Some(host) => { + #[cfg(unix)] + let is_satisfied = self.check_tpm_presence_unix(); + #[cfg(windows)] + let is_satisfied = self.check_tpm_presence_windows(); + + let mut annotation = Annotation::new(&key, self.hash.clone(), host, self.kind.clone(), is_satisfied); + let signature = serialise_and_sign(&self.sign, &annotation)?; + annotation.with_signature(&signature); + Ok(annotation) + }, + None => Err(Error::NoHostName) + } + } +} + + +#[cfg(test)] +mod tpm_tests { + use crate::{config, providers::sign_provider::get_priv_key}; + use crate::annotations::{Annotator, constants, TpmAnnotator}; + use crate::config::Signable; + #[cfg(unix)] + use super::UNIX_TPM_PATH; + + #[test] + fn valid_and_invalid_tpm_annotator() { + let config: config::SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + + let mut config2 = config.clone(); + config2.hash.hash_type = constants::HashType("Not a known hash type".to_string()); + + let data = String::from("Some random data"); + let sig = hex::encode([0u8; crypto::signatures::ed25519::SIGNATURE_LENGTH]); + + let signable = Signable::new(data, sig); + let serialised = serde_json::to_vec(&signable).unwrap(); + + let mut tpm_annotator_1 = TpmAnnotator::new(&config).unwrap(); + let mut tpm_annotator_2 = TpmAnnotator::new(&config2).unwrap(); + + let valid_annotation = tpm_annotator_1.annotate(&serialised).unwrap(); + let invalid_annotation = tpm_annotator_2.annotate(&serialised); + + assert!(valid_annotation.validate_base()); + assert!(invalid_annotation.is_err()); + } + + + #[test] + fn make_tpm_annotation() { + let config: config::SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + + let data = String::from("Some random data"); + let priv_key_file = std::fs::read(&config.signature.private_key_info.path).unwrap(); + let priv_key_string = String::from_utf8(priv_key_file).unwrap(); + let priv_key = get_priv_key(&priv_key_string).unwrap(); + let sig = priv_key.sign(data.as_bytes()); + + let signable = Signable::new(data, hex::encode(sig.to_bytes())); + let serialised = serde_json::to_vec(&signable).unwrap(); + + let mut tpm_annotator = TpmAnnotator::new(&config).unwrap(); + let annotation = tpm_annotator.annotate(&serialised).unwrap(); + + assert!(annotation.validate_base()); + assert_eq!(annotation.kind, *constants::ANNOTATION_TPM); + assert_eq!(annotation.host, gethostname::gethostname().to_str().unwrap()); + assert_eq!(annotation.hash, config.hash.hash_type); + + #[cfg(unix)] + let should_be_satisfied = std::fs::metadata(UNIX_TPM_PATH).is_ok(); + #[cfg(windows)] + let should_be_satisfied = { + let output = std::process::Command::new("tpmtool") + .arg("getdeviceinformation") + .output(); + match output { + Ok(output) => output.status.success() && String::from_utf8_lossy(&output.stdout).contains("TPM Present: Yes"), + Err(_) => false + } + }; + + assert_eq!(annotation.is_satisfied, should_be_satisfied); + } +} \ No newline at end of file diff --git a/src/annotations/mod.rs b/src/annotations/mod.rs new file mode 100644 index 0000000..03e7fe3 --- /dev/null +++ b/src/annotations/mod.rs @@ -0,0 +1,14 @@ +mod annotators; + +pub use annotators::*; +pub use alvarium_annotator::{Annotation, Annotator, AnnotationList, constants}; + +pub fn mock_annotation() -> Annotation { + let key = "The hash of the contents"; + let hash = constants::SHA256_HASH.clone(); + let host = "Host Device"; + let kind = constants::ANNOTATION_SOURCE.clone(); + let satisfied = true; + + Annotation::new(key, hash, host, kind, satisfied) +} diff --git a/src/config/hash.rs b/src/config/hash.rs new file mode 100644 index 0000000..be5aaf4 --- /dev/null +++ b/src/config/hash.rs @@ -0,0 +1,15 @@ +use alvarium_annotator::constants::Validate; +use crate::annotations::constants::HashType; +use serde::{Serialize, Deserialize}; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct HashInfo { + #[serde(rename="type")] + pub hash_type: HashType +} + +impl Validate for HashInfo { + fn validate(&self) -> bool { + self.hash_type.is_base_hash_type() + } +} \ No newline at end of file diff --git a/src/config/mod.rs b/src/config/mod.rs new file mode 100644 index 0000000..f591fa2 --- /dev/null +++ b/src/config/mod.rs @@ -0,0 +1,39 @@ +mod hash; +mod sdk; +mod sign; +mod stream; + +pub use hash::*; +pub use sdk::*; +pub use sign::*; +pub use stream::*; + + + +#[cfg(test)] +mod make_config_tests { + use super::{SdkInfo, StreamInfo, DemiaStreamsConfig, MqttStreamConfig}; + #[test] + fn new_config() { + let config: SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + assert!(config.hash.hash_type.is_base_hash_type()); + assert!(config.signature.private_key_info.key_type.is_base_key_algorithm()); + assert!(config.annotators[0].is_base_annotation_type()); + } + + #[test] + fn demia_streams_config() { + let config: StreamInfo = serde_json::from_slice(crate::DEMIA_TEST_CONFIG_BYTES.as_slice()).unwrap(); + let _is_config: DemiaStreamsConfig; + assert!(config.stream_type.is_base_stream_type()); + assert!(matches!(config.config, _is_config)); + } + + #[test] + fn mqtt_stream_config() { + let config: StreamInfo = serde_json::from_slice(crate::MQTT_TEST_CONFIG_BYTES.as_slice()).unwrap(); + let _mqtt_config: MqttStreamConfig; + assert!(config.stream_type.is_base_stream_type()); + assert!(matches!(config.config, _mqtt_config)); + } +} diff --git a/src/config/sdk.rs b/src/config/sdk.rs new file mode 100644 index 0000000..5fc2f47 --- /dev/null +++ b/src/config/sdk.rs @@ -0,0 +1,29 @@ +use serde::{Serialize, Deserialize}; +use crate::config::{HashInfo, SignatureInfo, StreamInfo}; +use crate::annotations::constants::AnnotationType; + + +fn level_info() -> String { + "info".to_string() +} +fn debug_location() -> String { + "log.out".to_string() +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SdkInfo { + pub annotators: Vec, + pub hash: HashInfo, + pub signature: SignatureInfo, + pub stream: StreamInfo, + #[serde(default)] + pub logging: LoggingConfiguration, +} + +#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] +pub struct LoggingConfiguration { + #[serde(default = "level_info")] + pub level: String, + #[serde(default = "debug_location")] + pub debug_location: String, +} diff --git a/src/config/sign.rs b/src/config/sign.rs new file mode 100644 index 0000000..9f5c278 --- /dev/null +++ b/src/config/sign.rs @@ -0,0 +1,17 @@ +use alvarium_annotator::constants::KeyAlgorithm; +use serde::{Serialize, Deserialize}; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SignatureInfo { + #[serde(rename="public")] + pub public_key_info: KeyInfo, + #[serde(rename="private")] + pub private_key_info: KeyInfo, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct KeyInfo { + #[serde(rename="type")] + pub key_type: KeyAlgorithm, + pub path: String +} \ No newline at end of file diff --git a/src/config/stream/demia_streams.rs b/src/config/stream/demia_streams.rs new file mode 100644 index 0000000..924e300 --- /dev/null +++ b/src/config/stream/demia_streams.rs @@ -0,0 +1,18 @@ +use serde::{Serialize, Deserialize}; +use crate::config::UrlInfo; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct DemiaStreamsConfig { + pub backup: DemiaStreamsBackup, + pub provider: UrlInfo, + #[serde(rename="tangle")] + pub tangle_node: UrlInfo, + pub encoding: String, + pub topic: String, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct DemiaStreamsBackup { + pub path: String, + pub password: String, +} diff --git a/src/config/stream/mod.rs b/src/config/stream/mod.rs new file mode 100644 index 0000000..4e65f56 --- /dev/null +++ b/src/config/stream/mod.rs @@ -0,0 +1,122 @@ +mod demia_streams; +mod mqtt; + +use alvarium_annotator::{SignProvider, StreamConfigWrapper}; +pub use demia_streams::*; +pub use mqtt::*; + +use serde::{Serialize, Deserialize}; + +use crate::annotations::constants::StreamType; +use crate::providers::sign_provider::SignatureProviderWrap; +use crate::errors::{Error, Result}; + + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct StreamInfo { + #[serde(rename="type")] + pub stream_type: StreamType, + pub config: StreamConfig +} + +impl StreamConfigWrapper for StreamInfo { + fn stream_type(&self) -> &StreamType { + &self.stream_type + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct UrlInfo { + pub host: String, + pub port: usize, + pub protocol: String +} + +impl UrlInfo { + pub fn uri(&self) -> String { + format!("{}://{}:{}", self.protocol, self.host, self.port) + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum StreamConfig { + DemiaStreams(DemiaStreamsConfig), + MQTT(MqttStreamConfig), +} + + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct Signable { + pub seed: String, + pub signature: String +} + +impl Signable { + pub fn new(seed: String, signature: String) -> Self { + Signable { seed, signature } + } + + pub fn verify_signature(&self, provider: &SignatureProviderWrap) -> Result { + if self.signature.is_empty() { + return Err(Error::EmptySignature) + } + + match provider { + SignatureProviderWrap::Ed25519(provider)=> { + let sig_bytes = hex::decode(&self.signature)?; + Ok(provider.verify(self.seed.as_bytes(), &sig_bytes)?) + } + } + } + + pub fn to_bytes(&self) -> Vec { + // Strings should not fail to serde + // TODO: Verify that this is the case + serde_json::to_vec(&self).unwrap() + } +} + + +#[cfg(test)] +mod config_tests { + use crypto::signatures::ed25519::SecretKey; + use crate::providers::sign_provider::{Ed25519Provider, SignatureProviderWrap}; + use crate::config; + use super::Signable; + use alvarium_annotator::SignProvider; + + #[tokio::test] + async fn verify_signable() { + let config: config::SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + let sig_provider = SignatureProviderWrap::Ed25519(Ed25519Provider::new(&config.signature).unwrap()); + + let data = "A data packet to sign".to_string(); + let sig = sig_provider.sign(data.as_bytes()).unwrap(); + + let signable = Signable { + seed: data, + signature: sig + }; + + assert!(signable.verify_signature(&sig_provider).unwrap()) + } + + #[test] + fn failed_verification_signable() { + let config: config::SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + let bad_priv_key = SecretKey::generate().unwrap(); + + let data = "A data packet to sign".to_string(); + let raw_sig = bad_priv_key.sign(data.as_bytes()); + + let signable = Signable { + seed: data, + signature: hex::encode(raw_sig.to_bytes()) + }; + + let sig_provider = SignatureProviderWrap::Ed25519(Ed25519Provider::new(&config.signature).unwrap()); + + assert!(!signable.verify_signature(&sig_provider).unwrap()) + } +} \ No newline at end of file diff --git a/src/config/stream/mqtt.rs b/src/config/stream/mqtt.rs new file mode 100644 index 0000000..209612b --- /dev/null +++ b/src/config/stream/mqtt.rs @@ -0,0 +1,24 @@ +use serde::{Serialize, Deserialize}; +use crate::config::UrlInfo; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct MqttStreamConfig { + #[serde(rename="clientId")] + pub client_id: String, + #[serde(rename="boundedCap")] + pub cap: usize, + #[serde(rename="keepAlive")] + pub keep_alive: u8, + pub qos: u8, + pub user: String, + password: String, + pub provider: UrlInfo, + pub cleanness: bool, + pub topics: Vec +} + +impl MqttStreamConfig { + pub(crate) fn password(&self) -> &str { + &self.password + } +} \ No newline at end of file diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 0000000..8633249 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,120 @@ +use thiserror::Error; +pub type Result = core::result::Result; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Fern error: {0}")] + LoggerSetupError(log::SetLoggerError), + + #[error("Logging format error: {0}")] + LoggerFormattingError(std::io::Error), + + #[error("Failed to deserialize: {0}")] + DeserializeError(serde_json::Error), + + #[error("HTTP Client error: {0}")] + HttpClientError(reqwest::Error), + + #[error("Core Alvarium error: {0}")] + AlvariumCoreError(alvarium_annotator::Error), + + #[error("Not a pre known Alvarium annotator: {0}. Should be built separately")] + NotKnownProvider(String), + + #[error("Streams Provider error: {0}")] + StreamsError(streams::Error), + + #[error("Streams LETS error: {0}")] + StreamsLetsError(streams::LetsError), + + #[error("Mqtt Client error: {0}")] + MqttClientError(rumqttc::ClientError), + + #[error("Mqtt Connection error: {0}")] + MqttConnectionError(rumqttc::ConnectionError), + + #[error("Mqtt Connect Return error: {0}")] + MqttConnectReturnError(String), + + #[error("Did not find keyload, subscription may not have been processed correctly")] + StreamsKeyloadNotFound, + + #[error("Malformed or incorrect configuration provided")] + IncorrectConfig, + + #[error("Empty signature field")] + EmptySignature, + + #[error("Could not retrieve host name")] + NoHostName, + + #[error("Hex decoding failed: {0}")] + HexDecodeFailure(hex::FromHexError), + + #[error("Array is not the correct size: {0}/{1}")] + IncorrectKeySize(usize, usize), + + #[error("Failed to make public key from provided bytes")] + PublicKeyFailure, + + #[error("External error: {0}")] + External(Box), + + #[error("Backup failed: {0}")] + BackupFailed(std::io::Error) +} + +impl From for Error { + fn from(e: serde_json::Error) -> Self { + Error::DeserializeError(e) + } +} + +impl From for Error { + fn from(e: hex::FromHexError) -> Self { + Error::HexDecodeFailure(e) + } +} + +impl From for Error { + fn from(e: alvarium_annotator::Error) -> Self { + Error::AlvariumCoreError(e) + } +} + +impl From for Error { + fn from(e: streams::Error) -> Self { + Error::StreamsError(e) + } +} + + +impl From for Error { + fn from(e: rumqttc::ClientError) -> Self { + Error::MqttClientError(e) + } +} + +impl From for Error { + fn from(e: rumqttc::ConnectionError) -> Self { + Error::MqttConnectionError(e) + } +} + +impl From for Error { + fn from(e: rumqttc::ConnectReturnCode) -> Self { + Error::MqttConnectReturnError(format!("{:?}", e)) + } +} + +impl From for Error { + fn from(e: reqwest::Error) -> Self { + Error::HttpClientError(e) + } +} + +impl From for Error { + fn from(e: streams::LetsError) -> Self { + Error::StreamsLetsError(e) + } +} \ No newline at end of file diff --git a/src/factories/annotator_factory.rs b/src/factories/annotator_factory.rs new file mode 100644 index 0000000..bc46fc0 --- /dev/null +++ b/src/factories/annotator_factory.rs @@ -0,0 +1,20 @@ +use alvarium_annotator::constants; +use crate::SdkAnnotator; +use crate::annotations::{PkiAnnotator, SourceAnnotator, TlsAnnotator, TpmAnnotator}; +use crate::config::SdkInfo; +use crate::errors::{Error, Result}; + + +pub fn new_annotator(kind: constants::AnnotationType, cfg: SdkInfo) -> Result> { + if !kind.is_base_annotation_type() { + return Err(Error::NotKnownProvider(kind.kind().to_string())) + } + + match kind.kind() { + "source" => Ok(Box::new(SourceAnnotator::new(&cfg)?)), + "pki" => Ok(Box::new(PkiAnnotator::new(&cfg)?)), + "tls" => Ok(Box::new(TlsAnnotator::new(&cfg)?)), + "tpm" => Ok(Box::new(TpmAnnotator::new(&cfg)?)), + _ => Err(Error::NotKnownProvider(kind.kind().to_string())) + } +} \ No newline at end of file diff --git a/src/factories/hash_factory.rs b/src/factories/hash_factory.rs new file mode 100644 index 0000000..54d2fb2 --- /dev/null +++ b/src/factories/hash_factory.rs @@ -0,0 +1,18 @@ +use alvarium_annotator::constants; +use crate::providers::hash_provider::{HashProviderWrapper, MD5Provider, NoneProvider, Sha256Provider}; +use crate::errors::{Error, Result}; + +pub fn new_hash_provider(kind: &constants::HashType) -> Result { + if !kind.is_base_hash_type() { + return Err(Error::NotKnownProvider(kind.0.clone())) + } + + + match kind.0.as_str() { + "md5" => Ok(HashProviderWrapper::MD5(MD5Provider::new())), + "sha256" => Ok(HashProviderWrapper::Sha256(Sha256Provider::new())), + "none" => Ok(HashProviderWrapper::None(NoneProvider::new())), + _ => Err(Error::NotKnownProvider(kind.0.clone())) + } +} + diff --git a/src/factories/mod.rs b/src/factories/mod.rs new file mode 100644 index 0000000..491508c --- /dev/null +++ b/src/factories/mod.rs @@ -0,0 +1,43 @@ +mod stream_factory; +mod annotator_factory; +mod hash_factory; +mod signature_factory; + + +pub use stream_factory::*; +pub use annotator_factory::*; +pub use hash_factory::*; +pub use signature_factory::*; + + +#[cfg(test)] +mod factory_tests { + use crate::config::SdkInfo; + use crate::factories::{new_annotator, new_hash_provider, new_signature_provider, new_stream_provider}; + + #[tokio::test] + async fn provider_factory() { + let sdk_info: SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + let _provider = new_stream_provider(sdk_info.stream).await.unwrap(); + } + + #[tokio::test] + async fn hasher_factory() { + let sdk_info: SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + let _provider = new_hash_provider(&sdk_info.hash.hash_type).unwrap(); + } + + #[tokio::test] + async fn signature_factory() { + let sdk_info: SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + let _provider = new_signature_provider(&sdk_info.signature).unwrap(); + } + + #[tokio::test] + async fn annotator_factory() { + let sdk_info: SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + for ann in &sdk_info.annotators { + let _annotator = new_annotator(ann.clone(), sdk_info.clone()).unwrap(); + } + } +} \ No newline at end of file diff --git a/src/factories/signature_factory.rs b/src/factories/signature_factory.rs new file mode 100644 index 0000000..4dec7a4 --- /dev/null +++ b/src/factories/signature_factory.rs @@ -0,0 +1,15 @@ +use crate::config::SignatureInfo; +use crate::errors::{Result, Error::NotKnownProvider}; +use crate::providers::sign_provider::{Ed25519Provider, SignatureProviderWrap}; + + +pub fn new_signature_provider(config: &SignatureInfo) -> Result { + if !config.private_key_info.key_type.is_base_key_algorithm() { + return Err(NotKnownProvider(config.private_key_info.key_type.0.clone())) + } + + match config.private_key_info.key_type.0.as_str() { + "ed25519" => Ok(SignatureProviderWrap::Ed25519(Ed25519Provider::new(config)?)), + _ => Err(NotKnownProvider(config.private_key_info.key_type.0.clone())) + } +} \ No newline at end of file diff --git a/src/factories/stream_factory.rs b/src/factories/stream_factory.rs new file mode 100644 index 0000000..f6ae38c --- /dev/null +++ b/src/factories/stream_factory.rs @@ -0,0 +1,18 @@ +use alvarium_annotator::Publisher; +use crate::errors::Result; +use crate::config::{StreamConfig, StreamInfo}; +use crate::providers::stream_provider::{DemiaPublisher, MqttPublisher, PublisherWrap}; + + +pub async fn new_stream_provider(cfg: StreamInfo) -> Result { + match cfg.config { + StreamConfig::DemiaStreams(_) => { + let publisher = DemiaPublisher::new(&cfg).await?; + Ok(PublisherWrap::Demia(publisher)) + } + StreamConfig::MQTT(_) => { + let publisher = MqttPublisher::new(&cfg).await?; + Ok(PublisherWrap::Mqtt(publisher)) + } + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..3c24c2c --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,30 @@ +extern crate core; + +pub mod config; +pub mod sdk; +pub mod providers; +pub mod annotations; +pub mod factories; +pub mod logging; +pub mod errors; + +pub type SdkAnnotator = dyn alvarium_annotator::Annotator; + +#[macro_use] +extern crate lazy_static; +// Creates a static CONFIG_BYTES value from the ./config.json file if it exists. If it does not +// it will use the base test_config.json to put into this value +lazy_static! { + pub static ref CONFIG_BYTES: Vec = { + match std::fs::read("config.json") { + Ok(config_bytes) => config_bytes, + Err(_) => std::fs::read("resources/test_config.json").unwrap() + } + }; + pub static ref MQTT_TEST_CONFIG_BYTES: Vec = { + std::fs::read("resources/mqtt_stream_config.json").unwrap() + }; + pub static ref DEMIA_TEST_CONFIG_BYTES: Vec = { + std::fs::read("resources/demia_streams_config.json").unwrap() + }; +} diff --git a/src/logging.rs b/src/logging.rs new file mode 100644 index 0000000..0bc2be0 --- /dev/null +++ b/src/logging.rs @@ -0,0 +1,39 @@ +// src/logger/mod.rs + +use crate::config::SdkInfo; +use log::LevelFilter; +use crate::errors::{Result, Error}; + +pub fn init(config: &SdkInfo) -> Result<()> { + let log_level = match config.logging.level.as_str() { + "debug" => LevelFilter::Debug, + "warn" => LevelFilter::Warn, + "error" => LevelFilter::Error, + _ => LevelFilter::Info, + }; + + fern::Dispatch::new() + .format(|out, message, record| { + + let source = format!("{}:{}", record.target(), record.line().unwrap_or_default()); + let gap = if source.len() < 35 { + " ".repeat(35 - source.len()) + } else { + " ".to_string() + }; + + out.finish(format_args!( + "[{} | {:6}| {}]{} {}", + chrono::Utc::now().format("%Y-%m-%d %H:%M:%S"), + record.level(), + source, + gap, + message + )) + }) + .level(log_level) + .chain(std::io::stdout()) + .apply() + .map_err(Error::LoggerSetupError)?; + Ok(()) +} diff --git a/src/providers/hash_provider/md5_provider.rs b/src/providers/hash_provider/md5_provider.rs new file mode 100644 index 0000000..8827105 --- /dev/null +++ b/src/providers/hash_provider/md5_provider.rs @@ -0,0 +1,60 @@ +use alvarium_annotator::HashProvider; +use md5_rs::Context; + +#[derive(Default)] +pub struct MD5Provider {} + +impl MD5Provider { + pub fn new() -> Self { + MD5Provider::default() + } +} + +impl HashProvider for MD5Provider { + fn derive(&self, data: &[u8]) -> String { + let mut ctx = Context::new(); + ctx.read(data); + hex::encode(ctx.finish()) + } +} + + +#[test] +fn md5_provider_test() { + use log::info; + struct Case<'a> { + name: &'a str, + data: &'a[u8], + expected: &'a str, + } + + let cases: Vec = vec![ + Case { + name: "text variation 1", + data: "foo".as_bytes(), + expected: "acbd18db4cc2f85cedef654fccc4a4d8", + }, + Case { + name: "text variation 2", + data: "bar".as_bytes(), + expected: "37b51d194a7513e45b56f6524f2d51f2", + }, + Case { + name: "text variation 3", + data: "baz".as_bytes(), + expected: "73feffa4b7f6bb68e44cf984c85f6e88", + }, + Case { + name: "byte sequence", + data: &[1_u8, 2, 3, 4, 5, 6, 7, 8, 9, 0], + expected: "7f63cb6d067972c3f34f094bb7e776a8", + }, + ]; + + let hash_provider = MD5Provider::new(); + for case in cases { + info!("Testing Case: {}", case.name); + let hash = hash_provider.derive(case.data); + assert_eq!(case.expected, hash) + } +} \ No newline at end of file diff --git a/src/providers/hash_provider/mod.rs b/src/providers/hash_provider/mod.rs new file mode 100644 index 0000000..4edeff8 --- /dev/null +++ b/src/providers/hash_provider/mod.rs @@ -0,0 +1,29 @@ +mod md5_provider; +mod none_provider; +mod sha256_provider; + +pub use md5_provider::MD5Provider; +pub use none_provider::NoneProvider; +pub use sha256_provider::Sha256Provider; + +pub enum HashProviderWrapper { + MD5(MD5Provider), + Sha256(Sha256Provider), + None(NoneProvider) +} + +impl alvarium_annotator::HashProvider for HashProviderWrapper { + fn derive(&self, data: &[u8]) -> String { + match self { + HashProviderWrapper::MD5(md5) => { + md5.derive(data) + }, + HashProviderWrapper::Sha256(sha256) => { + sha256.derive(data) + } + HashProviderWrapper::None(none) => { + none.derive(data) + } + } + } +} \ No newline at end of file diff --git a/src/providers/hash_provider/none_provider.rs b/src/providers/hash_provider/none_provider.rs new file mode 100644 index 0000000..6224842 --- /dev/null +++ b/src/providers/hash_provider/none_provider.rs @@ -0,0 +1,54 @@ +use alvarium_annotator::HashProvider; + +#[derive(Default)] +pub struct NoneProvider {} + +impl NoneProvider { + pub fn new() -> Self { + NoneProvider {} + } +} + +impl HashProvider for NoneProvider { + fn derive(&self, data: &[u8]) -> String { + unsafe { + String::from_utf8_unchecked(data.to_vec()) + } + } +} + + +#[test] +fn md5_provider_test() { + use log::info; + struct Case<'a> { + name: &'a str, + data: &'a[u8], + expected: &'a str, + } + + let cases: Vec = vec![ + Case { + name: "text variation 1", + data: "foo".as_bytes(), + expected: "foo", + }, + Case { + name: "text variation 2", + data: "bar".as_bytes(), + expected: "bar", + }, + Case { + name: "text variation 3", + data: "baz".as_bytes(), + expected: "baz", + }, + ]; + + for case in cases { + info!("Testing Case: {}", case.name); + let hash_provider = NoneProvider::new(); + let hash = hash_provider.derive(case.data); + assert_eq!(case.expected, hash) + } +} \ No newline at end of file diff --git a/src/providers/hash_provider/sha256_provider.rs b/src/providers/hash_provider/sha256_provider.rs new file mode 100644 index 0000000..9d8c5f8 --- /dev/null +++ b/src/providers/hash_provider/sha256_provider.rs @@ -0,0 +1,60 @@ +use alvarium_annotator::HashProvider; +use crypto::hashes::sha::{SHA256, SHA256_LEN}; + +#[derive(Default)] +pub struct Sha256Provider {} + +impl Sha256Provider { + pub fn new() -> Self { + Sha256Provider::default() + } +} + +impl HashProvider for Sha256Provider { + fn derive(&self, data: &[u8]) -> String { + let mut digest = [0_u8; SHA256_LEN]; + SHA256(data, &mut digest); + hex::encode(digest) + } +} + + +#[test] +fn sha256_provider_test() { + use log::info; + struct Case<'a> { + name: &'a str, + data: &'a[u8], + expected: &'a str, + } + + let cases: Vec = vec![ + Case { + name: "text variation 1", + data: "foo".as_bytes(), + expected: "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae", + }, + Case { + name: "text variation 2", + data: "bar".as_bytes(), + expected: "fcde2b2edba56bf408601fb721fe9b5c338d10ee429ea04fae5511b68fbf8fb9", + }, + Case { + name: "text variation 3", + data: "baz".as_bytes(), + expected: "baa5a0964d3320fbc0c6a922140453c8513ea24ab8fd0577034804a967248096", + }, + Case { + name: "byte sequence", + data: &[1_u8, 2, 3, 4, 5, 6, 7, 8, 9, 0], + expected: "9a89c68c4c5e28b8c4a5567673d462fff515db46116f9900624d09c474f593fb", + }, + ]; + + for case in cases { + info!("Testing Case: {}", case.name); + let hash_provider = Sha256Provider::new(); + let hash = hash_provider.derive(case.data); + assert_eq!(case.expected, hash) + } +} \ No newline at end of file diff --git a/src/providers/mod.rs b/src/providers/mod.rs new file mode 100644 index 0000000..c04cd65 --- /dev/null +++ b/src/providers/mod.rs @@ -0,0 +1,3 @@ +pub mod sign_provider; +pub mod hash_provider; +pub mod stream_provider; diff --git a/src/providers/sign_provider/ed25519.rs b/src/providers/sign_provider/ed25519.rs new file mode 100644 index 0000000..8814605 --- /dev/null +++ b/src/providers/sign_provider/ed25519.rs @@ -0,0 +1,73 @@ +use alvarium_annotator::SignProvider; +use crate::config::SignatureInfo; +use crate::errors::{Error, Result}; +use crypto::signatures::ed25519::{ + PublicKey, SecretKey, Signature, PUBLIC_KEY_LENGTH, SECRET_KEY_LENGTH, SIGNATURE_LENGTH, +}; + +pub struct Ed25519Provider { + public: PublicKey, + private: SecretKey + +} + +impl Ed25519Provider { + pub fn new(config: &SignatureInfo) -> Result { + let pub_key_file = std::fs::read(&config.public_key_info.path).unwrap(); + let pub_key_string = String::from_utf8(pub_key_file).unwrap(); + let pk = get_pub_key(&pub_key_string)?; + + let priv_key_file = std::fs::read(&config.private_key_info.path).unwrap(); + let priv_key_string = String::from_utf8(priv_key_file).unwrap(); + let sk = get_priv_key(&priv_key_string)?; + + Ok(Ed25519Provider { + public: pk, + private: sk, + }) + } +} + +impl SignProvider for Ed25519Provider { + type Error = crate::errors::Error; + fn sign(&self, content: &[u8]) -> Result { + Ok(hex::encode(self.private.sign(content).to_bytes())) + } + + + fn verify(&self, content: &[u8], signed: &[u8]) -> Result { + let sig = get_signature(signed)?; + Ok(self.public.verify(&sig,content)) + } +} + + +pub(crate) fn get_priv_key(key: &str) -> Result { + let decoded_key = hex::decode(key)?; + match <[u8;SECRET_KEY_LENGTH]>::try_from(decoded_key.as_slice()) { + Ok(resized) => Ok(SecretKey::from_bytes(resized)), + Err(_) => Err(Error::IncorrectKeySize(decoded_key.len(), SECRET_KEY_LENGTH)) + } +} + + +pub(crate) fn get_pub_key(key: &str) -> Result { + let decoded_key = hex::decode(key)?; + match <[u8;PUBLIC_KEY_LENGTH]>::try_from(decoded_key.as_slice()) { + Ok(resized) => { + match PublicKey::try_from_bytes(resized) { + Ok(pub_key) => Ok(pub_key), + Err(_) => Err(Error::PublicKeyFailure) + } + } + Err(_) => Err(Error::IncorrectKeySize(decoded_key.len(), PUBLIC_KEY_LENGTH)) + } +} + + +fn get_signature(signature: &[u8]) -> Result { + match <[u8;SIGNATURE_LENGTH]>::try_from(signature) { + Ok(resized) => Ok(Signature::from_bytes(resized)), + Err(_) => Err(Error::IncorrectKeySize(signature.len(), SIGNATURE_LENGTH)) + } +} \ No newline at end of file diff --git a/src/providers/sign_provider/mod.rs b/src/providers/sign_provider/mod.rs new file mode 100644 index 0000000..3aa6246 --- /dev/null +++ b/src/providers/sign_provider/mod.rs @@ -0,0 +1,24 @@ +mod ed25519; + +use crate::errors::Result; +pub use ed25519::*; + +pub enum SignatureProviderWrap { + Ed25519(Ed25519Provider) +} + +impl alvarium_annotator::SignProvider for SignatureProviderWrap { + type Error = crate::errors::Error; + fn sign(&self, content: &[u8]) -> Result { + match self { + SignatureProviderWrap::Ed25519(provider) => Ok(provider.sign(content)?) + } + } + + fn verify(&self, content: &[u8], signed: &[u8]) -> Result { + match self { + SignatureProviderWrap::Ed25519(provider) => Ok(provider.verify(content, signed)?) + } + } + +} \ No newline at end of file diff --git a/src/providers/stream_provider/demia.rs b/src/providers/stream_provider/demia.rs new file mode 100644 index 0000000..bd6174f --- /dev/null +++ b/src/providers/stream_provider/demia.rs @@ -0,0 +1,302 @@ +use crate::config::{DemiaStreamsConfig, StreamConfig, StreamInfo}; +use alvarium_annotator::{MessageWrapper, Publisher}; +use streams::{Address, User, transport::utangle::Client, id::{Ed25519, Identifier}, Message}; +use core::str::FromStr; +use std::thread::sleep; +use std::time::Duration; +use serde::{Serialize, Deserialize}; +use futures::TryStreamExt; +use log::{debug, info}; +use crate::errors::{Error, Result}; + +const MAX_RETRIES: u8 = 100; + + +pub struct DemiaPublisher { + cfg: DemiaStreamsConfig, + user: User, + identifier: Identifier, +} + + +impl DemiaPublisher { + pub(crate) async fn await_keyload(&mut self) -> Result<()> { + let mut i = 0; + info!("Awaiting Keyload message from publisher"); + while i < MAX_RETRIES { + let m = self.user.messages(); + if let Ok(next_messages) = m.try_collect::>().await { + for message in next_messages { + debug!("Found message: {}", message.address); + if let Some(keyload) = message.as_keyload() { + debug!("Found keyload"); + if keyload.includes_subscriber(&self.identifier) { + return Ok(()) + } + } + } + } + sleep(Duration::from_secs(5)); + i += 1; + } + Err(Error::StreamsKeyloadNotFound) + } + + pub fn client(&mut self) -> &mut User { + &mut self.user + } + + pub fn identifier(&self) -> &Identifier { + &self.identifier + } +} + +#[async_trait::async_trait] +impl Publisher for DemiaPublisher { + type StreamConfig = StreamInfo; + type Error = crate::errors::Error; + async fn new(cfg: &StreamInfo) -> Result { + match &cfg.config { + StreamConfig::DemiaStreams(cfg) => { + let client = Client::new(cfg.tangle_node.uri()); + match std::fs::read(&cfg.backup.path) { + Ok(user_bytes) => { + let user = User::restore(user_bytes, &cfg.backup.password, client).await?; + let identifier = user.identifier().unwrap().clone(); + Ok( + DemiaPublisher { + cfg: cfg.clone(), + user, + identifier + } + ) + }, + Err(_) => { + let mut seed = [0u8; 64]; + crypto::utils::rand::fill(&mut seed).unwrap(); + + let user = User::builder() + .with_transport(client) + .with_identity(Ed25519::from_seed(seed)) + .lean() + .build(); + + let identifier = user.identifier().unwrap().clone(); + Ok( + DemiaPublisher { + cfg: cfg.clone(), + user, + identifier, + } + ) + } + } + }, + _ => Err(Error::IncorrectConfig) + } + } + + async fn close(&mut self) -> Result<()> { + // No need to disconnect from stream or drop anything + Ok(()) + } + + async fn reconnect(&mut self) -> Result<()> { + // No need to reconnect as disconnection does not occur + Ok(()) + } + async fn connect(&mut self) -> Result<()> { + if self.user.stream_address().is_none() { + let announcement = get_announcement_id(&self.cfg.provider.uri()).await?; + let announcement_address = Address::from_str(&announcement)?; + info!("Announcement address: {}", announcement_address.to_string()); + + debug!("Fetching announcement message"); + self.user.receive_message(announcement_address).await?; + + debug!("Sending Streams Subscription message"); + let subscription = self.user.subscribe().await?; + + #[cfg(feature = "did-streams")] + let id_type = 1; + #[cfg(not(feature = "did-streams"))] + let id_type = 0; + + let body = SubscriptionRequest { + address: subscription.address().to_string(), + identifier: self.identifier.to_string(), + id_type, + topic: self.cfg.topic.to_string(), + }; + + let body_bytes = serde_json::to_vec(&body)?; + + info!("Sending subscription request to console"); + send_subscription_request(&self.cfg.provider.uri(), body_bytes).await?; + self.await_keyload().await?; + } + Ok(()) + } + + async fn publish(&mut self, msg: MessageWrapper<'_>) -> Result<()> { + debug!("Publishing message: {:?}", msg); + let bytes = serde_json::to_vec(&msg)?; + + let packet = self.user.message() + .with_payload(bytes) + .with_topic(self.cfg.topic.as_str()) + .signed() + .send() + .await?; + + let backup = self.user.backup(&self.cfg.backup.password).await?; + std::fs::write(&self.cfg.backup.path, backup).map_err(Error::BackupFailed)?; + info!("Published new message: {}", packet.address()); + Ok(()) + } +} + +async fn get_announcement_id(uri: &str) -> Result { + #[derive(Serialize, Deserialize)] + struct AnnouncementResponse { + announcement_id: String + } + + info!("Fetching stream announcement id"); + let client = reqwest::Client::new(); + let response = client.get(uri.to_owned() + "/get_announcement_id") + .send() + .await? + .bytes() + .await?; + + let announcement: AnnouncementResponse = serde_json::from_slice(&response)?; + Ok(announcement.announcement_id) +} + + +#[derive(Serialize, Deserialize)] +struct SubscriptionRequest { + address: String, + identifier: String, + #[serde(rename="idType")] + id_type: u8, + topic: String, +} +async fn send_subscription_request(uri: &str, body: Vec) -> Result<()> { + reqwest::Client::new() + .post(uri.to_owned() + "/subscribe") + .body(body) + .header("Content-Type", "application/json") + .send() + .await?; + Ok(()) +} + + + +#[cfg(test)] +mod demia_test { + use log::info; + use crate::{ + annotations::{AnnotationList, Annotator, PkiAnnotator}, + config::{SdkInfo, StreamConfig, Signable} + }; + use streams::id::{PermissionDuration, Permissioned}; + use super::{Client, DemiaPublisher, Ed25519, Publisher, MessageWrapper, User}; + const BASE_TOPIC: &'static str = "Base Topic"; + + #[tokio::test] + async fn new_demia_streams_provider() { + let sdk_info: SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + let _annotator = mock_provider(sdk_info).await; + } + + #[tokio::test] + async fn streams_provider_publish() { + let sdk_info: SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + let mut publisher = mock_provider(sdk_info.clone()).await; + + let raw_data_msg = "A packet to send to subscribers".to_string(); + let sig = hex::encode([0u8; crypto::signatures::ed25519::SIGNATURE_LENGTH]); + let signable = Signable::new(raw_data_msg, sig); + + let mut list = AnnotationList { items: vec![] }; + let mut pki_annotator = PkiAnnotator::new(&sdk_info).unwrap(); + list.items.push( + pki_annotator.annotate( + &serde_json::to_vec(&signable).unwrap() + ).unwrap() + ); + + let data = MessageWrapper { + action: crate::annotations::constants::ACTION_CREATE.clone(), + message_type: std::any::type_name::(), + content: &base64::encode(&serde_json::to_vec(&list).unwrap()), + }; + + info!("Publishing..."); + publisher.publish(data).await.unwrap(); + std::fs::remove_file("temp_file").unwrap(); + } + + #[tokio::test] + async fn streams_provider_restore() { + let sdk_info: SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + let mut provider = mock_provider(sdk_info.clone()).await; + let backup = provider.user.backup("password").await.unwrap(); + std::fs::write("temp_file", backup).unwrap(); + // If it made it here it's already confirmed that this is the case + if let StreamConfig::DemiaStreams(_config) = &sdk_info.stream.config { + // If no backup is available, then the restored publisher will have a new identity + let restored = DemiaPublisher::new(&sdk_info.stream).await.unwrap(); + assert!(restored.identifier.eq(provider.identifier())); + std::fs::remove_file("temp_file").unwrap(); + } + } + + async fn mock_provider(sdk_info: SdkInfo) -> DemiaPublisher { + if let StreamConfig::DemiaStreams(config) = &sdk_info.stream.config { + let client: Client = Client::new(&config.tangle_node.uri()); + let mut seed = [0u8; 64]; + crypto::utils::rand::fill(&mut seed).unwrap(); + + // Create an author to attach to + let mut streams_author = User::builder() + .with_transport(client) + .with_identity(Ed25519::from_seed(seed)) + .build(); + let announcement = streams_author.create_stream(BASE_TOPIC).await.unwrap(); + + let mut annotator = DemiaPublisher::new(&sdk_info.stream).await.unwrap(); + // To test connect, there needs to be a running provider (oracle) so we'll manually test + // this part + //annotator.connect().await.unwrap(); + + // Annotator will receive the announcement and send a subscription, in connect() it would + // send a subscription request to the oracle, for now we assume permission for connection + annotator.client().receive_message(announcement.address()).await.unwrap(); + let sub_message = annotator.client().subscribe().await.unwrap(); + + // Streams author accepts the subscription and dedicates a new branch specifically for + // the annotator + streams_author.receive_message(sub_message.address()).await.unwrap(); + streams_author.new_branch(BASE_TOPIC, config.topic.as_str()).await.unwrap(); + streams_author.send_keyload( + config.topic.as_str(), + vec![Permissioned::ReadWrite(annotator.identifier().clone(), PermissionDuration::Perpetual)], + vec![] + ) + .await + .unwrap(); + + annotator.await_keyload().await.unwrap(); + return annotator + } else { + panic!("Test configuration is not correct, should be demiaStreams config") + } + } +} + + + diff --git a/src/providers/stream_provider/mod.rs b/src/providers/stream_provider/mod.rs new file mode 100644 index 0000000..14b3473 --- /dev/null +++ b/src/providers/stream_provider/mod.rs @@ -0,0 +1,12 @@ +mod demia; +mod mqtt; + +pub use demia::DemiaPublisher; +pub use mqtt::MqttPublisher; + + +// TODO: Implement publisher for enum +pub enum PublisherWrap { + Demia(DemiaPublisher), + Mqtt(MqttPublisher), +} diff --git a/src/providers/stream_provider/mqtt.rs b/src/providers/stream_provider/mqtt.rs new file mode 100644 index 0000000..b9f0bb3 --- /dev/null +++ b/src/providers/stream_provider/mqtt.rs @@ -0,0 +1,146 @@ +use crate::config::{MqttStreamConfig, StreamConfig, StreamInfo}; +use rumqttc::{AsyncClient, ConnectionError, EventLoop, MqttOptions, QoS}; +use alvarium_annotator::{MessageWrapper, Publisher}; +use log::{debug, warn}; +use crate::errors::{Error, Result}; + +pub struct MqttPublisher { + cfg: MqttStreamConfig, + client: AsyncClient, + connection: EventLoop +} + + +#[async_trait::async_trait] +impl Publisher for MqttPublisher { + type StreamConfig = StreamInfo; + type Error = crate::errors::Error; + async fn new(cfg: &StreamInfo) -> Result { + match &cfg.config { + StreamConfig::MQTT(cfg)=> { + let (client, connection) = setup_client(cfg); + Ok(MqttPublisher { + cfg: cfg.clone(), + client, + connection + }) + } + _ => Err(Error::IncorrectConfig) + } + } + + async fn close(&mut self) -> Result<()> { + for topic in &self.cfg.topics { + self.client.unsubscribe(topic).await?; + } + Ok(self.client.disconnect().await?) + } + + async fn connect(&mut self) -> Result<()> { + self.reconnect().await + } + + async fn reconnect(&mut self) -> Result<()> { + debug!("Polling connection"); + if self.connection.poll().await.is_ok() { + return Ok(()) + } + + debug!("Making new client"); + let (client, mut connection) = setup_client(&self.cfg); + debug!("Polling for error in reconnection"); + if let Err(ConnectionError::ConnectionRefused(e)) = connection.poll().await { + warn!("Connection Error: {:?}", e); + return Err(e.into()) + } + + self.client = client; + self.connection = connection; + + let qos = qos(self.cfg.qos); + for topic in &self.cfg.topics { + debug!("Subscribing to topic: {}", topic); + self.client.subscribe(topic, qos).await? + } + Ok(()) + } + + async fn publish(&mut self, msg: MessageWrapper<'_>) -> Result<()> { + debug!("Reconnecting publisher"); + self.reconnect().await?; + let msg_str = serde_json::to_string(&msg)?; + let bytes = serde_json::to_vec(&msg)?; + for topic in &self.cfg.topics { + debug!("Posting {} to mqtt stream at topic {}", msg_str, topic); + self.client.publish(topic, qos(self.cfg.qos), true, bytes.clone()).await? + } + + Ok(()) + } +} + +fn setup_client(cfg: &MqttStreamConfig) -> (AsyncClient, EventLoop) { + let mut mqtt_options = MqttOptions::new(&cfg.client_id, &cfg.provider.host, cfg.provider.port as u16); + mqtt_options.set_keep_alive(tokio::time::Duration::from_secs(cfg.keep_alive as u64)); + mqtt_options.set_credentials(&cfg.user, cfg.password()); + mqtt_options.set_clean_session(cfg.cleanness); + + AsyncClient::new(mqtt_options, cfg.cap) +} + +fn qos(qos: u8) -> QoS { + match qos { + 1 => QoS::AtLeastOnce, + 2 => QoS::ExactlyOnce, + _ => QoS::AtMostOnce, + } +} + + +#[cfg(test)] +mod mqtt_tests { + use alvarium_annotator::{Annotator, AnnotationList, MessageWrapper, Publisher}; + use log::info; + use crate::annotations::PkiAnnotator; + use crate::config::{SdkInfo, Signable, StreamInfo}; + use crate::providers::stream_provider::MqttPublisher; + + #[tokio::test] + async fn new_mqtt_provider() { + let stream_config_bytes = std::fs::read("resources/mqtt_stream_config.json").unwrap(); + let stream_info: StreamInfo = serde_json::from_slice(stream_config_bytes.as_slice()).unwrap(); + let mut publisher = MqttPublisher::new(&stream_info).await.unwrap(); + publisher.close().await.unwrap(); + } + + #[tokio::test] + async fn mqtt_provider_publish() { + let sdk_info: SdkInfo = serde_json::from_slice(crate::CONFIG_BYTES.as_slice()).unwrap(); + let mqtt_stream_info: StreamInfo = serde_json::from_slice(crate::MQTT_TEST_CONFIG_BYTES.as_slice()).unwrap(); + + let mut publisher = MqttPublisher::new(&mqtt_stream_info).await.unwrap(); + publisher.connect().await.unwrap(); + + let raw_data_msg = "A packet to send to subscribers".to_string(); + let sig = hex::encode([0u8; crypto::signatures::ed25519::SIGNATURE_LENGTH]); + let signable = Signable::new(raw_data_msg, sig); + + let mut list = AnnotationList { items: vec![] }; + let mut pki_annotator = PkiAnnotator::new(&sdk_info).unwrap(); + list.items.push( + pki_annotator.annotate( + &serde_json::to_vec(&signable).unwrap() + ).unwrap() + ); + + let data = MessageWrapper { + action: crate::annotations::constants::ACTION_CREATE.clone(), + message_type: std::any::type_name::(), + content: &base64::encode(&serde_json::to_vec(&list).unwrap()), + }; + + info!("Publishing..."); + publisher.publish(data).await.unwrap(); + publisher.close().await.unwrap(); + } +} \ No newline at end of file diff --git a/src/sdk.rs b/src/sdk.rs new file mode 100644 index 0000000..117cd86 --- /dev/null +++ b/src/sdk.rs @@ -0,0 +1,206 @@ +use crate::config::{SdkInfo, StreamInfo}; +use crate::annotations::AnnotationList; +use alvarium_annotator::{MessageWrapper, Publisher}; +use alvarium_annotator::constants::{ACTION_CREATE, ACTION_MUTATE, ACTION_PUBLISH, ACTION_TRANSIT, ANNOTATION_SOURCE}; +use crate::factories::new_annotator; +use crate::errors::Result; +use crate::SdkAnnotator; + +pub struct SDK<'a, Pub: Publisher> { + annotators: &'a mut [Box], + pub cfg: SdkInfo, + stream: Pub +} + +impl<'a, Pub: Publisher> SDK<'a, Pub> { + pub async fn new(cfg: SdkInfo, annotators: &'a mut [Box]) -> Result> { + let mut publisher = Pub::new(&cfg.stream).await?; + publisher.connect().await?; + Ok(SDK { + annotators, + cfg, + stream: publisher, + }) + } + + pub async fn create(&mut self, data: &[u8]) -> Result<()> { + let mut ann_list = AnnotationList::default(); + for annotator in self.annotators.iter_mut() { + ann_list.items.push(annotator.annotate(data)?); + } + + let ann_bytes = serde_json::to_vec(&ann_list)?; + let wrapper = MessageWrapper { + action: ACTION_CREATE.clone(), + message_type: std::any::type_name::(), + content: &base64::encode(ann_bytes) + }; + self.stream.publish(wrapper).await + } + + pub async fn mutate(&mut self, old: &[u8], new: &[u8]) -> Result<()> { + let mut ann_list = AnnotationList::default(); + + let mut source = new_annotator(ANNOTATION_SOURCE.clone(), self.cfg.clone())?; + let annotation = source.annotate(old)?; + ann_list.items.push(annotation); + + for annotator in self.annotators.iter_mut() { + ann_list.items.push(annotator.annotate(new)?); + } + + let ann_bytes = serde_json::to_vec(&ann_list)?; + let wrapper = MessageWrapper { + action: ACTION_MUTATE.clone(), + message_type: std::any::type_name::(), + content: &base64::encode(ann_bytes) + }; + self.stream.publish(wrapper).await + } + + pub async fn transit(&mut self, data: &[u8]) -> Result<()> { + let mut ann_list = AnnotationList::default(); + for annotator in self.annotators.iter_mut() { + ann_list.items.push(annotator.annotate(data)?); + } + + let ann_bytes = serde_json::to_vec(&ann_list)?; + let wrapper = MessageWrapper { + action: ACTION_TRANSIT.clone(), + message_type: std::any::type_name::(), + content: &base64::encode(ann_bytes) + }; + self.stream.publish(wrapper).await + } + + pub async fn publish(&mut self, data: &[u8]) -> Result<()> { + let mut ann_list = AnnotationList::default(); + for annotator in self.annotators.iter_mut() { + ann_list.items.push(annotator.annotate(data)?); + } + + let ann_bytes = serde_json::to_vec(&ann_list)?; + let wrapper = MessageWrapper { + action: ACTION_PUBLISH.clone(), + message_type: std::any::type_name::(), + content: &base64::encode(ann_bytes) + }; + self.stream.publish(wrapper).await + } +} + + +#[cfg(test)] +mod sdk_tests { + use streams::{ + id::{Ed25519, PermissionDuration, Permissioned}, + transport::utangle::Client, + User, + }; + use alvarium_annotator::Publisher; + use crate::{config::{SdkInfo, StreamConfig, Signable}, CONFIG_BYTES, providers::stream_provider::DemiaPublisher}; + use crate::factories::new_annotator; + use super::SDK; + + const BASE_TOPIC: &'static str = "Base Topic"; + + #[tokio::test] + async fn sdk_create_transit_publish() { + // Uses base CONFIG_BYTES pulled from local config file (or the resources/test_config.json + // if no config file is present) + let sdk_info: SdkInfo = serde_json::from_slice(CONFIG_BYTES.as_slice()).unwrap(); + let publisher = mock_annotator(sdk_info.clone()).await; + + let mut annotators = Vec::new(); + for ann in &sdk_info.annotators { + let annotator = new_annotator(ann.clone(), sdk_info.clone()).unwrap(); + annotators.push(annotator) + } + + // Mocks SDK::new() without Pub::connect() + let mut sdk = SDK { + annotators: annotators.as_mut_slice(), + cfg: sdk_info.clone(), + stream: publisher, + }; + + let data = "A packet to send to subscribers".to_string(); + let sig = hex::encode([0u8; crypto::signatures::ed25519::SIGNATURE_LENGTH]); + let signable = Signable::new(data, sig); + sdk.create(signable.to_bytes().as_slice()).await.unwrap(); + sdk.transit(signable.to_bytes().as_slice()).await.unwrap(); + sdk.publish(signable.to_bytes().as_slice()).await.unwrap(); + std::fs::remove_file("temp_file").unwrap(); + } + + #[tokio::test] + async fn sdk_mutate() { + // Uses base CONFIG_BYTES pulled from local config file (or the resources/test_config.json + // if no config file is present) + let sdk_info: SdkInfo = serde_json::from_slice(CONFIG_BYTES.as_slice()).unwrap(); + let publisher = mock_annotator(sdk_info.clone()).await; + + let mut annotators = Vec::new(); + for ann in &sdk_info.annotators { + let annotator = new_annotator(ann.clone(), sdk_info.clone()).unwrap(); + annotators.push(annotator) + } + + // Mocks SDK::new() without Pub::connect() + let mut sdk = SDK { + annotators: annotators.as_mut_slice(), + cfg: sdk_info.clone(), + stream: publisher, + }; + + let data = "A packet to send to subscribers".to_string(); + let old_data = "Some old state of the data before mutation".to_string(); + let sig = hex::encode([0u8; crypto::signatures::ed25519::SIGNATURE_LENGTH]); + let signable = Signable::new(data, sig); + sdk.mutate(old_data.as_bytes(), signable.to_bytes().as_slice()).await.unwrap(); + std::fs::remove_file("temp_file").unwrap(); + } + + // Mocks Pub::new() with demiaPublisher Annotator + async fn mock_annotator(sdk_info: SdkInfo) -> DemiaPublisher { + if let StreamConfig::DemiaStreams(config) = &sdk_info.stream.config { + let client: Client = Client::new(&config.tangle_node.uri()); + let mut seed = [0u8; 64]; + crypto::utils::rand::fill(&mut seed).unwrap(); + + // Create an author to attach to + let mut streams_author = User::builder() + .with_transport(client) + .with_identity(Ed25519::from_seed(seed)) + .build(); + let announcement = streams_author.create_stream(BASE_TOPIC).await.unwrap(); + + let mut publisher = DemiaPublisher::new(&sdk_info.stream).await.unwrap(); + // To test connect, there needs to be a running provider (oracle) so we'll manually test + // this part + //annotator.connect().await.unwrap(); + + // Annotator will receive the announcement and send a subscription, in connect() it would + // send a subscription request to the oracle, for now we assume permission for connection + publisher.client().receive_message(announcement.address()).await.unwrap(); + let sub_message = publisher.client().subscribe().await.unwrap(); + + // Streams author accepts the subscription and dedicates a new branch specifically for + // the annotator + streams_author.receive_message(sub_message.address()).await.unwrap(); + streams_author.new_branch(BASE_TOPIC, config.topic.as_str()).await.unwrap(); + streams_author.send_keyload( + config.topic.as_str(), + vec![Permissioned::ReadWrite(publisher.identifier().clone(), PermissionDuration::Perpetual)], + vec![] + ) + .await + .unwrap(); + + publisher.await_keyload().await.unwrap(); + return publisher + } else { + panic!("Test configuration is not correct, should be demiaStreams config") + } + } +} \ No newline at end of file