Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline to storage_account_client #811

Merged
merged 7 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdk/core/src/headers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ impl Headers {
}

/// Get a header value given a specific header name
pub fn get(&self, key: &HeaderName) -> Option<&HeaderValue> {
self.0.get(key)
pub fn get<T: Into<HeaderName>>(&self, key: T) -> Option<&HeaderValue> {
self.0.get(&key.into())
}

/// Insert a header name/value pair
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl HttpClient for reqwest::Client {
}

async fn execute_request2(&self, request: &crate::Request) -> Result<crate::Response> {
let url = url::Url::parse(&request.uri().to_string())?;
let url = request.url().clone();
let mut reqwest_request = self.request(request.method(), url);
for (name, value) in request.headers().iter() {
reqwest_request = reqwest_request.header(name, value);
Expand Down
17 changes: 7 additions & 10 deletions sdk/core/src/mock/mock_request.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::{Body, Request};
use http::{Method, Uri};
use http::Method;
use serde::de::Visitor;
use serde::ser::{Serialize, SerializeStruct, Serializer};
use serde::{Deserialize, Deserializer};
use std::collections::HashMap;
use std::str::FromStr;
use url::Url;

const FIELDS: &[&str] = &["uri", "method", "headers", "body"];

Expand Down Expand Up @@ -83,8 +84,11 @@ impl<'de> Visitor<'de> for RequestVisitor {
hm.insert(k.to_owned().into(), v.into());
}

// `url` cannot be relative
let url = Url::parse("http://example.com").unwrap();
let url = url.join(uri.1).expect("expected a valid uri");
Ok(Self::Value {
uri: Uri::from_str(uri.1).expect("expected a valid uri"),
url,
method: Method::from_str(method.1).expect("expected a valid HTTP method"),
headers: hm.into(),
body: bytes::Bytes::from(body).into(),
Expand All @@ -107,14 +111,7 @@ impl Serialize for Request {
}

let mut state = serializer.serialize_struct("Request", 4)?;
state.serialize_field(
FIELDS[0],
&self
.uri
.path_and_query()
.map(|p| p.to_string())
.unwrap_or_else(String::new),
)?;
state.serialize_field(FIELDS[0], &self.path_and_query())?;
state.serialize_field(FIELDS[1], &self.method.to_string())?;
state.serialize_field(FIELDS[2], &hm)?;
state.serialize_field(
Expand Down
8 changes: 2 additions & 6 deletions sdk/core/src/mock/player_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,8 @@ impl Policy for MockTransportPlayerPolicy {
let expected_request: Request = serde_json::from_str(&expected_request)?;
let expected_response = serde_json::from_str::<MockResponse>(&expected_response)?;

let expected_uri = expected_request.uri().to_string();
let actual_uri = request
.uri()
.path_and_query()
.map(|p| p.to_string())
.unwrap_or_else(String::new);
let expected_uri = expected_request.path_and_query();
let actual_uri = request.path_and_query();
if expected_uri != actual_uri {
return Err(Error::with_message(ErrorKind::MockFramework, || {
format!(
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/src/policies/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ impl Policy for TransportPolicy {

let response = { self.transport_options.http_client.execute_request2(request) };

Ok(response.await?)
response.await
}
}
35 changes: 21 additions & 14 deletions sdk/core/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::error::{ErrorKind, Result, ResultExt};
use crate::headers::{AsHeaders, Headers};
use crate::SeekableStream;
use bytes::Bytes;
use http::{Method, Uri};
use http::Method;
use std::fmt::Debug;
use std::str::FromStr;
use url::Url;

/// An HTTP Body.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -36,25 +35,38 @@ impl From<Box<dyn SeekableStream>> for Body {
/// body. Policies are expected to enrich the request by mutating it.
#[derive(Debug, Clone)]
pub struct Request {
pub(crate) uri: Uri,
pub(crate) url: Url,
pub(crate) method: Method,
pub(crate) headers: Headers,
pub(crate) body: Body,
}

impl Request {
/// Create a new request with an empty body and no headers
pub fn new(uri: Uri, method: Method) -> Self {
pub fn new(url: Url, method: Method) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes #809

Self {
uri,
url,
method,
headers: Headers::new(),
body: Body::Bytes(bytes::Bytes::new()),
}
}

pub fn uri(&self) -> &Uri {
&self.uri
pub fn url(&self) -> &Url {
&self.url
}

pub fn url_mut(&mut self) -> &mut Url {
&mut self.url
}

pub fn path_and_query(&self) -> String {
let mut result = self.url.path().to_owned();
if let Some(query) = self.url.query() {
result.push('?');
result.push_str(query)
}
result
}

pub fn method(&self) -> Method {
Expand Down Expand Up @@ -82,11 +94,6 @@ impl Request {
pub fn set_body(&mut self, body: impl Into<Body>) {
self.body = body.into();
}

/// Parse a `Uri` from a `str`
pub fn parse_uri(uri: &str) -> Result<Uri> {
Uri::from_str(uri).map_kind(ErrorKind::DataConversion)
}
}

/// Temporary hack to convert preexisting requests into the new format. It
Expand All @@ -95,7 +102,7 @@ impl From<http::Request<bytes::Bytes>> for Request {
fn from(request: http::Request<bytes::Bytes>) -> Self {
let (parts, body) = request.into_parts();
Self {
uri: parts.uri,
url: Url::parse(&parts.uri.to_string()).unwrap(),
method: parts.method,
headers: parts.headers.into(),
body: Body::Bytes(body),
Expand Down
30 changes: 15 additions & 15 deletions sdk/data_cosmos/src/authorization_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ use url::form_urlencoded;
const AZURE_VERSION: &str = "2018-12-31";
const VERSION: &str = "1.0";

/// The `AuthorizationPolicy` takes care to authenticate your calls to Azure CosmosDB. Currently it
/// supports two type of authorization: one at service level and another at resource level (see
/// [AuthorizationToken] for more info). The policy must be added just before the transport policy
/// The `AuthorizationPolicy` takes care to authenticate your calls to Azure CosmosDB.
///
/// Currently it supports two type of authorization: one at service level and another at resource level (see
/// [`AuthorizationToken`] for more info). The policy must be added just before the transport policy
/// because it needs to inspect the values that are about to be sent to the transport and inject
/// the proper authorization token.
/// The `AuthorizationPolicy` is the only owner of the passed credentials so if you want to
/// authenticate the same operation with different credentials all you have to do is to swap the
///
/// The `AuthorizationPolicy` is the only owner of the passed credentials, so if you want to
/// authenticate the same operation with different credentials, all you have to do is to swap the
/// `AuthorizationPolicy`.
/// This struct is `Debug` but secrets are encrypted by `AuthorizationToken` so there is no risk of
///
/// This struct implements `Debug` but secrets are encrypted by `AuthorizationToken` so there is no risk of
/// leaks in debug logs (secrets are stored in cleartext in memory: dumps are still leaky).
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuthorizationPolicy {
Expand Down Expand Up @@ -54,11 +57,11 @@ impl Policy for AuthorizationPolicy {

let time_nonce = TimeNonce::new();

let uri_path = &request.uri().path_and_query().unwrap().to_string()[1..];
let uri_path = request.path_and_query();
trace!("uri_path used by AuthorizationPolicy == {:#?}", uri_path);

let auth = {
let resource_link = generate_resource_link(uri_path);
let resource_link = generate_resource_link(&uri_path);
trace!("resource_link == {}", resource_link);
generate_authorization(
&self.authorization_token,
Expand Down Expand Up @@ -100,9 +103,7 @@ impl Policy for AuthorizationPolicy {
/// 2. Find if the uri **is** the ending string (without the leading slash). If so return an empty
/// string. This covers the exception of the rule above.
/// 3. Return the received uri unchanged.
// TODO: will become private as soon as client will be migrated
// to pipeline arch.
pub(crate) fn generate_resource_link(uri: &str) -> &str {
fn generate_resource_link(uri: &str) -> &str {
static ENDING_STRINGS: &[&str] = &[
"/dbs",
"/colls",
Expand Down Expand Up @@ -139,12 +140,11 @@ pub(crate) fn generate_resource_link(uri: &str) -> &str {
}
}

/// The CosmosDB authorization can either be "primary" (ie one of the two service-level tokens) or
/// "resource" (ie a single database). In the first case the signature must be constructed by
/// The CosmosDB authorization can either be "primary" (i.e., one of the two service-level tokens) or
/// "resource" (i.e., a single database). In the first case the signature must be constructed by
/// signing the HTTP method, resource type, resource link (the relative URI) and the current time.
/// In the second case, the signature is just the resource key.
// TODO: make it private after pipeline migration
pub(crate) fn generate_authorization(
fn generate_authorization(
auth_token: &AuthorizationToken,
http_method: &http::Method,
resource_type: &ResourceType,
Expand Down
4 changes: 2 additions & 2 deletions sdk/identity/src/client_credentials_flow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use azure_core::{
use http::Method;
use login_response::LoginResponse;
use std::sync::Arc;
use url::form_urlencoded;
use url::{form_urlencoded, Url};

/// Perform the client credentials flow
#[allow(clippy::manual_async_fn)]
Expand All @@ -66,7 +66,7 @@ pub async fn perform(
.append_pair("grant_type", "client_credentials")
.finish();

let url = Request::parse_uri(&format!(
let url = Url::parse(&format!(
"https://login.microsoftonline.com/{}/oauth2/v2.0/token",
tenant_id
))
Expand Down
4 changes: 2 additions & 2 deletions sdk/identity/src/device_code_flow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use http::Method;
use oauth2::ClientId;
use serde::Deserialize;
use std::{borrow::Cow, sync::Arc, time::Duration};
use url::form_urlencoded;
use url::{form_urlencoded, Url};

/// Start the device authorization grant flow.
/// The user has only 15 minutes to sign in (the usual value for expires_in).
Expand Down Expand Up @@ -171,7 +171,7 @@ async fn post_form(
url: &str,
form_body: String,
) -> Result<Response> {
let url = Request::parse_uri(url)?;
let url = Url::parse(url)?;
let mut req = Request::new(url, Method::POST);
req.headers_mut().insert(
headers::CONTENT_TYPE,
Expand Down
4 changes: 2 additions & 2 deletions sdk/identity/src/refresh_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use oauth2::{AccessToken, ClientId, ClientSecret};
use serde::Deserialize;
use std::fmt;
use std::sync::Arc;
use url::form_urlencoded;
use url::{form_urlencoded, Url};

/// Exchange a refresh token for a new access token and refresh token
#[allow(clippy::manual_async_fn)]
Expand All @@ -34,7 +34,7 @@ pub async fn exchange(
let encoded = encoded.append_pair("refresh_token", refresh_token.secret());
let encoded = encoded.finish();

let url = Request::parse_uri(&format!(
let url = Url::parse(&format!(
"https://login.microsoftonline.com/{}/oauth2/v2.0/token",
tenant_id
))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ impl TokenCredential for ImdsManagedIdentityCredential {
"error parsing url for MSI endpoint",
)?;

let url = Request::parse_uri(url.as_str())?;
let mut req = Request::new(url, Method::GET);

req.headers_mut().insert("Metadata", "true");
Expand Down
5 changes: 4 additions & 1 deletion sdk/storage/examples/account00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ async fn main() -> Result<()> {
StorageAccountClient::new_access_key(http_client.clone(), &account, &master_key)
.as_storage_client();

let response = storage_client.get_account_information().execute().await?;
let response = storage_client
.get_account_information()
.into_future()
.await?;
println!("{:?}", response);

Ok(())
Expand Down
1 change: 1 addition & 0 deletions sdk/storage/src/account/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod operations;
pub mod requests;
pub mod responses;

Expand Down
87 changes: 87 additions & 0 deletions sdk/storage/src/account/operations/get_account_information.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use crate::core::prelude::*;
use azure_core::error::Result;
use azure_core::headers::{
account_kind_from_headers, date_from_headers, request_id_from_headers, sku_name_from_headers,
};

use azure_core::{Context, RequestId};
use chrono::{DateTime, Utc};
use http::HeaderMap;

#[derive(Debug, Clone)]
pub struct GetAccountInformationBuilder {
storage_client: StorageClient,
context: Context,
}

impl GetAccountInformationBuilder {
pub(crate) fn new(storage_client: StorageClient) -> Self {
Self {
storage_client,
context: Context::new(),
}
}

setters! {
context: Context => context,
}

pub fn into_future(mut self) -> GetAccountInformation {
Box::pin(async move {
let mut request = self
.storage_client
.storage_account_client()
.blob_storage_request("", http::Method::GET);

for (k, v) in [("restype", "account"), ("comp", "properties")].iter() {
request.url_mut().query_pairs_mut().append_pair(k, v);
}

let response = self
.storage_client
.storage_account_client()
.pipeline()
.send(&mut self.context, &mut request)
.await?;

GetAccountInformationResponse::try_from(response.headers())
})
}
}

/// The future returned by calling `into_future` on the builder.
pub type GetAccountInformation =
futures::future::BoxFuture<'static, azure_core::error::Result<GetAccountInformationResponse>>;

#[cfg(feature = "into_future")]
impl std::future::IntoFuture for GetAccountInformationBuilder {
type IntoFuture = GetAccountInformation;
type Output = <GetAccountInformation as std::future::Future>::Output;
fn into_future(self) -> Self::IntoFuture {
Self::into_future(self)
}
}

#[derive(Debug, Clone)]
pub struct GetAccountInformationResponse {
pub request_id: RequestId,
pub date: DateTime<Utc>,
pub sku_name: String,
pub account_kind: String,
}

impl GetAccountInformationResponse {
pub(crate) fn try_from(headers: &HeaderMap) -> Result<GetAccountInformationResponse> {
let request_id = request_id_from_headers(headers)?;
let date = date_from_headers(headers)?;
let sku_name = sku_name_from_headers(headers)?;
let account_kind = account_kind_from_headers(headers)?;

Ok(GetAccountInformationResponse {
request_id,
date,
sku_name,
account_kind,
})
}
}
3 changes: 3 additions & 0 deletions sdk/storage/src/account/operations/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod get_account_information;

pub use get_account_information::*;
Loading