diff --git a/core/src/services/dropbox/backend.rs b/core/src/services/dropbox/backend.rs index f0ae8cde05a9..4af8f01b68fc 100644 --- a/core/src/services/dropbox/backend.rs +++ b/core/src/services/dropbox/backend.rs @@ -20,10 +20,10 @@ use std::sync::Arc; use async_trait::async_trait; use http::StatusCode; +use serde::Deserialize; use super::core::DropboxCore; use super::error::parse_error; -use super::response::DropboxMetadataResponse; use super::writer::DropboxWriter; use crate::raw::*; use crate::*; @@ -163,3 +163,57 @@ impl Accessor for DropboxBackend { } } } + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxMetadataResponse { + #[serde(rename(deserialize = ".tag"))] + pub tag: String, + pub client_modified: String, + pub content_hash: Option, + pub file_lock_info: Option, + pub has_explicit_shared_members: Option, + pub id: String, + pub is_downloadable: Option, + pub name: String, + pub path_display: String, + pub path_lower: String, + pub property_groups: Option>, + pub rev: Option, + pub server_modified: Option, + pub sharing_info: Option, + pub size: Option, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxMetadataFileLockInfo { + pub created: Option, + pub is_lockholder: bool, + pub lockholder_name: Option, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxMetadataPropertyGroup { + pub fields: Vec, + pub template_id: String, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxMetadataPropertyGroupField { + pub name: String, + pub value: String, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxMetadataSharingInfo { + pub modified_by: Option, + pub parent_shared_folder_id: Option, + pub read_only: Option, + pub shared_folder_id: Option, + pub traverse_only: Option, + pub no_access: Option, +} diff --git a/core/src/services/dropbox/builder.rs b/core/src/services/dropbox/builder.rs index 373c83a9ce80..2827d86a9b72 100644 --- a/core/src/services/dropbox/builder.rs +++ b/core/src/services/dropbox/builder.rs @@ -20,8 +20,13 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use chrono::DateTime; +use chrono::Utc; +use tokio::sync::Mutex; + use super::backend::DropboxBackend; use super::core::DropboxCore; +use super::core::DropboxSigner; use crate::raw::*; use crate::*; @@ -72,8 +77,12 @@ use crate::*; #[derive(Default)] pub struct DropboxBuilder { - access_token: Option, root: Option, + access_token: Option, + refresh_token: Option, + client_id: Option, + client_secret: Option, + http_client: Option, } @@ -84,15 +93,47 @@ impl Debug for DropboxBuilder { } impl DropboxBuilder { - /// default: no access token, which leads to failure + /// Set the root directory for dropbox. + /// + /// Default to `/` if not set. + pub fn root(&mut self, root: &str) -> &mut Self { + self.root = Some(root.to_string()); + self + } + + /// Access token is used for temporary access to the Dropbox API. + /// + /// You can get the access token from [Dropbox App Console](https://www.dropbox.com/developers/apps) + /// + /// NOTE: this token will be expired in 4 hours. If you are trying to use dropbox services in a long time, please set a refresh_token instead. pub fn access_token(&mut self, access_token: &str) -> &mut Self { self.access_token = Some(access_token.to_string()); self } - /// default: no root path, which leads to failure - pub fn root(&mut self, root: &str) -> &mut Self { - self.root = Some(root.to_string()); + /// Refersh token is used for long term access to the Dropbox API. + /// + /// You can get the refresh token via OAuth2.0 Flow of dropbox. + /// + /// OpenDAL will use this refresh token to get a new access token when the old one is expired. + pub fn refresh_token(&mut self, refresh_token: &str) -> &mut Self { + self.refresh_token = Some(refresh_token.to_string()); + self + } + + /// Set the client id for dropbox. + /// + /// This is required for OAuth2.0 Flow with refresh token. + pub fn client_id(&mut self, client_id: &str) -> &mut Self { + self.client_id = Some(client_id.to_string()); + self + } + + /// Set the client secret for dropbox. + /// + /// This is required for OAuth2.0 Flow with refresh token. + pub fn client_secret(&mut self, client_secret: &str) -> &mut Self { + self.client_secret = Some(client_secret.to_string()); self } @@ -116,6 +157,9 @@ impl Builder for DropboxBuilder { let mut builder = Self::default(); map.get("root").map(|v| builder.root(v)); map.get("access_token").map(|v| builder.access_token(v)); + map.get("refresh_token").map(|v| builder.refresh_token(v)); + map.get("client_id").map(|v| builder.client_id(v)); + map.get("client_secret").map(|v| builder.client_secret(v)); builder } @@ -129,20 +173,57 @@ impl Builder for DropboxBuilder { .with_context("service", Scheme::Dropbox) })? }; - let token = match self.access_token.clone() { - Some(access_token) => access_token, - None => { + + let signer = match (self.access_token.take(), self.refresh_token.take()) { + (Some(access_token), None) => DropboxSigner { + access_token, + // We will never expire user specified token. + expires_in: DateTime::::MAX_UTC, + ..Default::default() + }, + (None, Some(refresh_token)) => { + let client_id = self.client_id.take().ok_or_else(|| { + Error::new( + ErrorKind::ConfigInvalid, + "client_id must be set when refresh_token is set", + ) + .with_context("service", Scheme::Dropbox) + })?; + let client_secret = self.client_secret.take().ok_or_else(|| { + Error::new( + ErrorKind::ConfigInvalid, + "client_secret must be set when refresh_token is set", + ) + .with_context("service", Scheme::Dropbox) + })?; + + DropboxSigner { + refresh_token, + client_id, + client_secret, + ..Default::default() + } + } + (Some(_), Some(_)) => { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "access_token and refresh_token can not be set at the same time", + ) + .with_context("service", Scheme::Dropbox)) + } + (None, None) => { return Err(Error::new( ErrorKind::ConfigInvalid, - "access_token is required", - )) + "access_token or refresh_token must be set", + ) + .with_context("service", Scheme::Dropbox)) } }; Ok(DropboxBackend { core: Arc::new(DropboxCore { root, - token, + signer: Arc::new(Mutex::new(signer)), client, }), }) diff --git a/core/src/services/dropbox/core.rs b/core/src/services/dropbox/core.rs index 790d69fcdea3..bbb2e5208012 100644 --- a/core/src/services/dropbox/core.rs +++ b/core/src/services/dropbox/core.rs @@ -18,16 +18,22 @@ use std::default::Default; use std::fmt::Debug; use std::fmt::Formatter; +use std::sync::Arc; use bytes::Bytes; +use chrono::DateTime; +use chrono::Utc; use http::header; -use http::request::Builder; +use http::header::CONTENT_LENGTH; +use http::header::CONTENT_TYPE; use http::Request; use http::Response; use serde::Deserialize; use serde::Serialize; +use tokio::sync::Mutex; use crate::raw::build_rooted_abs_path; +use crate::raw::new_json_deserialize_error; use crate::raw::new_json_serialize_error; use crate::raw::new_request_build_error; use crate::raw::AsyncBody; @@ -36,7 +42,7 @@ use crate::raw::IncomingAsyncBody; use crate::types::Result; pub struct DropboxCore { - pub token: String, + pub signer: Arc>, pub client: HttpClient, pub root: String, } @@ -63,12 +69,13 @@ impl DropboxCore { }; let request_payload = serde_json::to_string(&download_args).map_err(new_json_serialize_error)?; - let request = self - .build_auth_header(Request::post(&url)) + let mut request = Request::post(&url) .header("Dropbox-API-Arg", request_payload) - .header(header::CONTENT_LENGTH, 0) + .header(CONTENT_LENGTH, 0) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; + + self.sign(&mut request).await?; self.client.send(request).await } @@ -86,15 +93,14 @@ impl DropboxCore { }; let mut request_builder = Request::post(&url); if let Some(size) = size { - request_builder = request_builder.header(header::CONTENT_LENGTH, size); + request_builder = request_builder.header(CONTENT_LENGTH, size); } request_builder = request_builder.header( - header::CONTENT_TYPE, + CONTENT_TYPE, content_type.unwrap_or("application/octet-stream"), ); - let request = self - .build_auth_header(request_builder) + let mut request = request_builder .header( "Dropbox-API-Arg", serde_json::to_string(&args).map_err(new_json_serialize_error)?, @@ -102,6 +108,7 @@ impl DropboxCore { .body(body) .map_err(new_request_build_error)?; + self.sign(&mut request).await?; self.client.send(request).await } @@ -113,12 +120,13 @@ impl DropboxCore { let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?); - let request = self - .build_auth_header(Request::post(&url)) - .header(header::CONTENT_TYPE, "application/json") - .header(header::CONTENT_LENGTH, bs.len()) + let mut request = Request::post(&url) + .header(CONTENT_TYPE, "application/json") + .header(CONTENT_LENGTH, bs.len()) .body(AsyncBody::Bytes(bs)) .map_err(new_request_build_error)?; + + self.sign(&mut request).await?; self.client.send(request).await } @@ -130,12 +138,13 @@ impl DropboxCore { let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?); - let request = self - .build_auth_header(Request::post(&url)) - .header(header::CONTENT_TYPE, "application/json") - .header(header::CONTENT_LENGTH, bs.len()) + let mut request = Request::post(&url) + .header(CONTENT_TYPE, "application/json") + .header(CONTENT_LENGTH, bs.len()) .body(AsyncBody::Bytes(bs)) .map_err(new_request_build_error)?; + + self.sign(&mut request).await?; self.client.send(request).await } @@ -148,19 +157,87 @@ impl DropboxCore { let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?); - let request = self - .build_auth_header(Request::post(&url)) - .header(header::CONTENT_TYPE, "application/json") - .header(header::CONTENT_LENGTH, bs.len()) + let mut request = Request::post(&url) + .header(CONTENT_TYPE, "application/json") + .header(CONTENT_LENGTH, bs.len()) .body(AsyncBody::Bytes(bs)) .map_err(new_request_build_error)?; + + self.sign(&mut request).await?; + self.client.send(request).await } - fn build_auth_header(&self, mut req: Builder) -> Builder { - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - req + pub async fn sign(&self, req: &mut Request) -> Result<()> { + let mut signer = self.signer.lock().await; + + // Access token is valid, use it directly. + if !signer.access_token.is_empty() && signer.expires_in > Utc::now() { + let value = format!("Bearer {}", signer.access_token) + .parse() + .expect("token must be valid header"); + req.headers_mut().insert(header::AUTHORIZATION, value); + return Ok(()); + } + + // Refresh invalid token. + let url = "https://api.dropboxapi.com/oauth2/token".to_string(); + + let content = format!( + "grant_type=refresh_token&refresh_token={}&client_id={}&client_secret={}", + signer.refresh_token, signer.client_id, signer.client_secret + ); + let bs = Bytes::from(content); + + let request = Request::post(&url) + .header(CONTENT_TYPE, "application/x-www-form-urlencoded") + .header(CONTENT_LENGTH, bs.len()) + .body(AsyncBody::Bytes(bs)) + .map_err(new_request_build_error)?; + + let resp = self.client.send(request).await?; + let body = resp.into_body().bytes().await?; + + let token: DropboxTokenResponse = + serde_json::from_slice(&body).map_err(new_json_deserialize_error)?; + + // Update signer after token refreshed. + + signer.access_token = token.access_token.clone(); + + // Refresh it 2 minutes earlier. + signer.expires_in = Utc::now() + chrono::Duration::seconds(token.expires_in as i64) + - chrono::Duration::seconds(120); + + let value = format!("Bearer {}", token.access_token) + .parse() + .expect("token must be valid header"); + req.headers_mut().insert(header::AUTHORIZATION, value); + + Ok(()) + } +} + +#[derive(Clone)] +pub struct DropboxSigner { + pub client_id: String, + pub client_secret: String, + pub refresh_token: String, + + pub access_token: String, + pub expires_in: DateTime, +} + +impl Default for DropboxSigner { + fn default() -> Self { + DropboxSigner { + refresh_token: "".to_string(), + client_id: String::new(), + client_secret: String::new(), + + access_token: "".to_string(), + expires_in: DateTime::::MIN_UTC, + } } } @@ -178,6 +255,18 @@ struct DropboxUploadArgs { strict_conflict: bool, } +impl Default for DropboxUploadArgs { + fn default() -> Self { + DropboxUploadArgs { + mode: "overwrite".to_string(), + path: "".to_string(), + mute: true, + autorename: false, + strict_conflict: false, + } + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] struct DropboxDeleteArgs { path: String, @@ -196,18 +285,6 @@ struct DropboxMetadataArgs { path: String, } -impl Default for DropboxUploadArgs { - fn default() -> Self { - DropboxUploadArgs { - mode: "overwrite".to_string(), - path: "".to_string(), - mute: true, - autorename: false, - strict_conflict: false, - } - } -} - impl Default for DropboxMetadataArgs { fn default() -> Self { DropboxMetadataArgs { @@ -218,3 +295,9 @@ impl Default for DropboxMetadataArgs { } } } + +#[derive(Clone, Deserialize)] +struct DropboxTokenResponse { + access_token: String, + expires_in: usize, +} diff --git a/core/src/services/dropbox/error.rs b/core/src/services/dropbox/error.rs index 5e5831b15f76..57e3c177bb8a 100644 --- a/core/src/services/dropbox/error.rs +++ b/core/src/services/dropbox/error.rs @@ -17,19 +17,26 @@ use http::Response; use http::StatusCode; +use http::Uri; +use serde::Deserialize; -use super::response::DropboxErrorResponse; use crate::raw::*; use crate::Error; use crate::ErrorKind; use crate::Result; +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxErrorResponse { + error_summary: String, +} + /// Parse error response into Error. pub async fn parse_error(resp: Response) -> Result { let (parts, body) = resp.into_parts(); let bs = body.bytes().await?; - let (kind, retryable) = match parts.status { + let (mut kind, mut retryable) = match parts.status { StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), StatusCode::INTERNAL_SERVER_ERROR @@ -39,53 +46,39 @@ pub async fn parse_error(resp: Response) -> Result { _ => (ErrorKind::Unexpected, false), }; - let dropbox_error = - serde_json::from_slice::(&bs).map_err(new_json_deserialize_error); - match dropbox_error { - Ok(dropbox_error) => { - // We cannot get the error type from the response header when the status code is 409. - // Because Dropbox API v2 will put error summary in the response body, - // we need to parse it to get the correct error type and then error kind. - // See https://www.dropbox.com/developers/documentation/http/documentation#error-handling - let error_summary = dropbox_error.error_summary.as_str(); + let (message, dropbox_err) = serde_json::from_slice::(&bs) + .map(|dropbox_err| (format!("{dropbox_err:?}"), Some(dropbox_err))) + .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None)); - let mut err = Error::new( - match parts.status { - // 409 Conflict means that Endpoint-specific error. - // Look to the JSON response body for the specifics of the error. - StatusCode::CONFLICT => { - if error_summary.contains("path/not_found") - || error_summary.contains("path_lookup/not_found") - { - ErrorKind::NotFound - } else if error_summary.contains("path/conflict") { - ErrorKind::AlreadyExists - } else { - ErrorKind::Unexpected - } - } - // Otherwise, we can get the error type from the response status code. - _ => kind, - }, - error_summary, - ) - .with_context("response", format!("{parts:?}")); + if let Some(dropbox_err) = dropbox_err { + (kind, retryable) = + parse_dropbox_error_summary(&dropbox_err.error_summary).unwrap_or((kind, retryable)); + } - if retryable { - err = err.set_temporary(); - } + let mut err = Error::new(kind, &message).with_context("response", format!("{parts:?}")); - Ok(err) - } - Err(_err) => { - let mut err = Error::new(kind, &String::from_utf8_lossy(&bs)) - .with_context("response", format!("{parts:?}")); + if retryable { + err = err.set_temporary(); + } - if retryable { - err = err.set_temporary(); - } + if let Some(uri) = parts.extensions.get::() { + err = err.with_context("uri", uri.to_string()); + } + + Ok(err) +} - Ok(err) - } +/// We cannot get the error type from the response header when the status code is 409. +/// Because Dropbox API v2 will put error summary in the response body, +/// we need to parse it to get the correct error type and then error kind. +/// +/// See +pub fn parse_dropbox_error_summary(summary: &str) -> Option<(ErrorKind, bool)> { + if summary.starts_with("path/not_found") || summary.starts_with("path_lookup/not_found") { + Some((ErrorKind::NotFound, false)) + } else if summary.starts_with("path/conflict") { + Some((ErrorKind::AlreadyExists, false)) + } else { + None } } diff --git a/core/src/services/dropbox/mod.rs b/core/src/services/dropbox/mod.rs index 048b7d74d4bf..239974a75d8c 100644 --- a/core/src/services/dropbox/mod.rs +++ b/core/src/services/dropbox/mod.rs @@ -19,7 +19,6 @@ mod backend; mod builder; mod core; mod error; -mod response; mod writer; pub use builder::DropboxBuilder as Dropbox; diff --git a/core/src/services/dropbox/response.rs b/core/src/services/dropbox/response.rs deleted file mode 100644 index 2e5c15a81cbe..000000000000 --- a/core/src/services/dropbox/response.rs +++ /dev/null @@ -1,86 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use serde::Deserialize; - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxErrorResponse { - pub error_summary: String, - pub error: DropboxErrorDetail, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxErrorDetail { - #[serde(rename(deserialize = ".tag"))] - pub tag: String, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxMetadataResponse { - #[serde(rename(deserialize = ".tag"))] - pub tag: String, - pub client_modified: String, - pub content_hash: Option, - pub file_lock_info: Option, - pub has_explicit_shared_members: Option, - pub id: String, - pub is_downloadable: Option, - pub name: String, - pub path_display: String, - pub path_lower: String, - pub property_groups: Option>, - pub rev: Option, - pub server_modified: Option, - pub sharing_info: Option, - pub size: Option, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxMetadataFileLockInfo { - pub created: Option, - pub is_lockholder: bool, - pub lockholder_name: Option, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxMetadataPropertyGroup { - pub fields: Vec, - pub template_id: String, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxMetadataPropertyGroupField { - pub name: String, - pub value: String, -} - -#[derive(Default, Debug, Deserialize)] -#[serde(default)] -pub struct DropboxMetadataSharingInfo { - pub modified_by: Option, - pub parent_shared_folder_id: Option, - pub read_only: Option, - pub shared_folder_id: Option, - pub traverse_only: Option, - pub no_access: Option, -}