From e55769df1d04cafd824754bd6801a935a2f7e143 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sun, 26 Jan 2025 00:34:59 +0100 Subject: [PATCH] refactor: improve HTTP client and documentation - Simplify HTTP client response handling - Add explicit text/json content type methods - Add HTTP status code verification - Fix JWT logout on interrupt - Prevent infinite loops during JWT renewal - Remove credentials from documentation examples This improves error handling robustness and security practices while making the HTTP client interface cleaner and more explicit. --- CHANGELOG.md | 2 + src/gateway.rs | 104 +++++++++++++++++++------------------------------ src/http.rs | 70 ++++++++++++++++++++++++++------- src/remote.rs | 46 ++++++++++++---------- src/track.rs | 22 +++++------ 5 files changed, 132 insertions(+), 112 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index baa8fcf..080ce7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/). - [decoder] Always use accurate seeking mode for reliable position reporting - [decoder] Fix logical error in `size_hint()` lower bound calculation - [decoder] Remove `ExactSizeIterator` implementation as total samples can't be determined exactly +- [http] Simplified HTTP client response handling and content type management +- [http] Added status code checking in HTTP client responses - [player] Improve seek logging with more detailed timestamps and progress information - [remote] Improve network timeout handling and error messages diff --git a/src/gateway.rs b/src/gateway.rs index 4e54d5c..26e9548 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -39,6 +39,8 @@ //! //! # Example //! +//! # Example +//! //! ```rust //! use pleezer::gateway::Gateway; //! @@ -48,11 +50,10 @@ //! let arl = gateway.oauth("user@example.com", "password").await?; //! gateway.login_with_arl(&arl).await?; //! -//! // Or use existing ARL -//! gateway.login_with_arl(&existing_arl).await?; -//! -//! // Session automatically renews -//! gateway.refresh().await?; +//! // Make authenticated requests +//! let songs = gateway.list_to_queue(&track_list).await?; +//! let recommendations = gateway.user_radio(user_id).await?; +//! let user_data = gateway.refresh().await?; //! ``` use std::time::SystemTime; @@ -62,8 +63,7 @@ use futures_util::TryFutureExt; use md5::{Digest, Md5}; use reqwest::{ self, - header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE}, - Body, + header::{HeaderMap, HeaderValue, AUTHORIZATION}, }; use serde::Deserialize; use url::Url; @@ -182,12 +182,6 @@ impl Gateway { /// Returns access token on successful authentication. const OAUTH_LOGIN_URL: &'static str = "https://connect.deezer.com/oauth/user_auth.php"; - /// Content type for gateway requests. - /// - /// Although the bodies of all gateway requests are JSON, the - /// `Content-Type` is not. - const PLAIN_TEXT_CONTENT: HeaderValue = HeaderValue::from_static("text/plain;charset=UTF-8"); - /// Default empty JSON body for requests. /// /// Used when a request requires a body but has no parameters. @@ -377,6 +371,7 @@ impl Gateway { /// Returns an error if: /// * URL construction fails /// * Network request fails + /// * HTTP status code is not successful (not 2xx) /// * Response isn't valid JSON /// * Response can't be parsed as type T pub async fn request( @@ -404,19 +399,17 @@ impl Gateway { self.client_id, ); let url = url_str.parse::()?; - let mut request = self.http_client.post(url, body); - - let request_headers = request.headers_mut(); - request_headers.try_insert(CONTENT_TYPE, Self::PLAIN_TEXT_CONTENT)?; - // Add any headers that were passed in. + // Although the bodies of all gateway requests are JSON, the + // `Content-Type` is not. + let mut request = self.http_client.text(url, body); if let Some(headers) = headers { - request_headers.extend(headers); + // Add any headers that were passed in. + request.headers_mut().extend(headers); } let response = self.http_client.execute(request).await?; let body = response.text().await?; - protocol::json(&body, T::METHOD) } @@ -725,7 +718,7 @@ impl Gateway { // First get a session ID. The response can be ignored because the // session ID is stored in the cookie store. let request = self.http_client.get(Url::parse(Self::OAUTH_SID_URL)?, ""); - let _ = self.http_client.execute(request).await?; + self.http_client.execute(request).await?; // Then login and get an access token. let query = Url::parse(&format!( @@ -744,47 +737,6 @@ impl Gateway { self.get_arl(&result.access_token).await } - /// Performs JWT authentication request. - /// - /// Internal helper for JWT operations that: - /// * Formats request URL with parameters - /// * Sends authenticated request - /// * Handles response and cookies - /// - /// # Arguments - /// - /// * `endpoint` - JWT endpoint path - /// * `body` - Request body content - /// - /// # Errors - /// - /// Returns error if: - /// * URL construction fails - /// * Network request fails - /// * Authentication fails - async fn jwt_auth(&mut self, endpoint: &str, body: T) -> Result<()> - where - T: Into, - { - // `c` for cookie (headers), `p` for payload (body) - let query = Url::parse(&format!( - "{}{}?jo=p&rto=c&i=p", - Self::JWT_AUTH_URL, - endpoint - ))?; - - let request = self.http_client.post(query, body); - let response = self.http_client.execute(request).await?; - - if !response.status().is_success() { - let message = response.text().await?; - return Err(Error::permission_denied(message)); - } - - // When successful, the `refresh-token` cookie is set within the HTTP client's cookie store. - Ok(()) - } - /// Authenticates using JWT and ARL token. /// /// Establishes a persistent session using: @@ -804,12 +756,23 @@ impl Gateway { /// * Network request fails /// * Authentication fails pub async fn login_with_arl(&mut self, arl: &Arl) -> Result<()> { + // `c` for cookie (headers), `p` for payload (body) + let query = Url::parse(&format!( + "{}{}?jo=p&rto=c&i=p", + Self::JWT_AUTH_URL, + Self::JWT_ENDPOINT_LOGIN + ))?; + let auth = auth::Jwt { arl: arl.to_string(), account_id: self.user_token().await?.user_id.to_string(), }; - self.jwt_auth(Self::JWT_ENDPOINT_LOGIN, serde_json::to_string(&auth)?) - .await + + let request = self.http_client.json(query, serde_json::to_string(&auth)?); + self.http_client.execute(request).await?; + + // When successful, the `refresh-token` cookie is set within the HTTP client's cookie store. + Ok(()) } /// Renews the current login session. @@ -824,7 +787,18 @@ impl Gateway { /// * Network request fails /// * Token renewal fails pub async fn renew_login(&mut self) -> Result<()> { - self.jwt_auth(Self::JWT_ENDPOINT_RENEW, "").await + // `c` for cookie (headers), `p` for payload (body) + let query = Url::parse(&format!( + "{}{}?jo=p&rto=c&i=c", + Self::JWT_AUTH_URL, + Self::JWT_ENDPOINT_RENEW + ))?; + + let request = self.http_client.json(query, Self::EMPTY_JSON_OBJECT); + self.http_client.execute(request).await?; + + // When successful, the `refresh-token` cookie is set within the HTTP client's cookie store. + Ok(()) } /// Logs out and invalidates the current session. diff --git a/src/http.rs b/src/http.rs index 7542658..4578e86 100644 --- a/src/http.rs +++ b/src/http.rs @@ -41,17 +41,19 @@ //! // Or without session management for public endpoints //! let client = Client::without_cookies(&config)?; //! -//! // Make authenticated requests -//! let request = client.post(auth_url, credentials); -//! let response = client.execute(request).await?; +//! // Make API requests with appropriate content type +//! let request = client.json(api_url, track_data); // for JSON data +//! // or +//! let request = client.text(api_url, metadata); // for plain text data +//! let response = client.execute(request).await?; // checks HTTP status code //! //! // Cookies are automatically managed for session persistence //! ``` -use std::{future::Future, num::NonZeroU32, sync::Arc, time::Duration}; +use std::{num::NonZeroU32, sync::Arc, time::Duration}; -use futures_util::{FutureExt, TryFutureExt}; use governor::{DefaultDirectRateLimiter, Quota}; +use http::header::CONTENT_TYPE; use reqwest::{ self, header::{HeaderValue, ACCEPT_LANGUAGE}, @@ -66,6 +68,7 @@ use crate::{config::Config, error::Result}; /// * Cookie-based session persistence /// * Rate limiting for API quotas /// * Consistent configuration +// TODO: implement builder pattern pub struct Client { /// Unlimited request client for special cases. /// @@ -117,6 +120,16 @@ impl Client { /// * Maintain responsive streaming const READ_TIMEOUT: Duration = Duration::from_secs(2); + /// Content type for plain text requests. + /// + /// Used by `text()` method to set Content-Type header to "text/plain;charset=UTF-8" + const CONTENT_TYPE_TEXT: HeaderValue = HeaderValue::from_static("text/plain;charset=UTF-8"); + + /// Content type for JSON requests. + /// + /// Used by `json()` method to set Content-Type header to "application/json" + const CONTENT_TYPE_JSON: HeaderValue = HeaderValue::from_static("application/json"); + /// Creates a new client with optional session management. /// /// # Arguments @@ -257,7 +270,7 @@ impl Client { request } - /// Builds a POST request. + /// Builds a POST request with plain text body. /// /// Convenience method for `request()` with POST method. /// @@ -266,12 +279,37 @@ impl Client { /// * `url` - Request URL /// * `body` - Request body content #[inline] - pub fn post(&self, url: U, body: T) -> reqwest::Request + pub fn text(&self, url: U, body: T) -> reqwest::Request where U: Into, T: Into, { - self.request(Method::POST, url, body) + let mut request = self.request(Method::POST, url, body); + request + .headers_mut() + .insert(CONTENT_TYPE, Self::CONTENT_TYPE_TEXT); + request + } + + /// Builds a POST request with JSON body. + /// + /// Convenience method for `request()` with POST method. + /// + /// # Arguments + /// + /// * `url` - Request URL + /// * `body` - Request body content + #[inline] + pub fn json(&self, url: U, body: T) -> reqwest::Request + where + U: Into, + T: Into, + { + let mut request = self.request(Method::POST, url, body); + request + .headers_mut() + .insert(CONTENT_TYPE, Self::CONTENT_TYPE_JSON); + request } /// Builds a GET request. @@ -294,7 +332,8 @@ impl Client { /// Executes a request with rate limiting. /// /// Applies rate limiting before executing the request to - /// comply with API quotas. + /// comply with API quotas. Automatically verifies that the response + /// has a successful HTTP status code (2xx range). /// /// # Arguments /// @@ -305,14 +344,15 @@ impl Client { /// Returns error if: /// * Rate limiting fails /// * Request execution fails + /// * Response status code is not successful (not 2xx) /// * Network error occurs - pub fn execute( - &self, - request: reqwest::Request, - ) -> impl Future> + '_ { + pub async fn execute(&self, request: reqwest::Request) -> Result { // No need to await with jitter because the level of concurrency is low. // TODO : use different rate limiter for each host. - let throttle = self.rate_limiter.until_ready(); - throttle.then(|()| self.unlimited.execute(request).map_err(Into::into)) + self.rate_limiter.until_ready().await; + match self.unlimited.execute(request).await { + Ok(response) => response.error_for_status().map_err(Into::into), + Err(e) => Err(e.into()), + } } } diff --git a/src/remote.rs b/src/remote.rs index 384aa5c..76ebbfe 100644 --- a/src/remote.rs +++ b/src/remote.rs @@ -625,6 +625,8 @@ impl Client { Ok(inner) => { if let Err(e) = inner { warn!("jwt login failed: {e}"); + } else { + debug!("jwt logged in"); } } Err(e) => warn!("jwt login timed out: {e}"), @@ -643,7 +645,7 @@ impl Client { self.user_token = Some(user_token); // Decorate the websocket request with the same cookies as the gateway. - let (cookie_str, login_ttl) = self.cookie_str_ttl(); + let (cookie_str, new_ttl) = self.cookie_str_ttl(); request = request.with_header(http::header::COOKIE.as_str(), cookie_str); // Set timer for user token expiration. Wake a short while before @@ -654,9 +656,9 @@ impl Client { // The user token expiration is much longer than the cookie expiration. // We need to regularly refresh the cookie to keep the user token alive. + let mut login_ttl = new_ttl.saturating_sub(Self::TOKEN_EXPIRATION_THRESHOLD); debug!("login time to live: {:.0}s", login_ttl.as_secs_f32().ceil()); - let login_ttl = login_ttl.saturating_sub(Self::TOKEN_EXPIRATION_THRESHOLD); - let login_expiry = tokio::time::sleep(login_ttl); + let login_expiry = tokio::time::sleep(Duration::from_secs(4)); tokio::pin!(login_expiry); let config = Some( @@ -707,17 +709,15 @@ impl Client { } () = &mut login_expiry => { - debug!("refreshing login"); // Soft failure: JWT logins are not required to interact with the gateway. match tokio::time::timeout(Self::NETWORK_TIMEOUT, self.gateway.renew_login()).await { Ok(inner) => { match inner { Ok(()) => { - let (_, login_ttl) = self.cookie_str_ttl(); - if let Some(deadline) = tokio::time::Instant::now().checked_add(login_ttl) { - debug!("login time to live: {:.0}s", login_ttl.as_secs_f32().ceil()); - login_expiry.as_mut().reset(deadline); - } } + debug!("jwt renewed"); + let (_, new_ttl) = self.cookie_str_ttl(); + login_ttl = new_ttl.saturating_sub(Self::TOKEN_EXPIRATION_THRESHOLD); + } Err(e) => { error!("jwt renewal failed: {e}"); } @@ -725,6 +725,11 @@ impl Client { } Err(e) => warn!("jwt login timed out: {e}"), } + + debug!("login time to live: {:.0}s", login_ttl.as_secs_f32().ceil()); + if let Some(deadline) = tokio::time::Instant::now().checked_add(login_ttl) { + login_expiry.as_mut().reset(deadline); + } } Some(token_ttl) = self.time_to_live_rx.recv() => { @@ -772,17 +777,6 @@ impl Client { }; self.stop().await; - - // Soft failure: JWT logins are not required to interact with the gateway. - match tokio::time::timeout(Self::NETWORK_TIMEOUT, self.gateway.logout()).await { - Ok(inner) => { - if let Err(e) = inner { - warn!("jwt logout failed: {e}"); - } - } - Err(e) => warn!("jwt logout timed out: {e}"), - } - loop_result } @@ -1013,6 +1007,18 @@ impl Client { self.subscriptions.remove(&ident); } } + + // Soft failure: JWT logins are not required to interact with the gateway. + match tokio::time::timeout(Self::NETWORK_TIMEOUT, self.gateway.logout()).await { + Ok(inner) => { + if let Err(e) = inner { + warn!("jwt logout failed: {e}"); + } else { + debug!("jwt logged out"); + } + } + Err(e) => warn!("jwt logout timed out: {e}"), + } } /// Creates a message targeted at a specific device. diff --git a/src/track.rs b/src/track.rs index 66f5543..e9dbe48 100644 --- a/src/track.rs +++ b/src/track.rs @@ -75,8 +75,9 @@ //! // Create track from gateway data //! let mut track = Track::from(track_data); //! -//! // Get media source -//! let medium = track.get_medium(&client, &media_url, quality, license_token).await?; +//! // Get media source with proper content type for track metadata +//! let request = client.json(media_url, track_data); +//! let response = client.execute(request).await?; //! //! // Start download //! track.start_download(&client, &medium).await?; @@ -94,7 +95,6 @@ use std::{ time::{Duration, SystemTime}, }; -use reqwest::header::{HeaderValue, CONTENT_TYPE}; use stream_download::{ self, http::HttpStream, source::SourceStream, storage::StorageProvider, StreamDownload, StreamHandle, StreamPhase, StreamState, @@ -709,9 +709,6 @@ impl Track { Ok(MediumType::Primary(medium)) } - /// Content type for media requests. - const JSON_CONTENT: HeaderValue = HeaderValue::from_static("application/json"); - /// Retrieves a media source for the track. /// /// Attempts to get download URLs for the requested quality level, @@ -731,6 +728,8 @@ impl Track { /// * Quality level is unknown /// * Media source unavailable /// * Network request fails + /// * HTTP response status is not successful (not 2xx) + /// * Response parsing fails /// /// # Quality Fallback /// @@ -811,10 +810,7 @@ impl Track { let get_url = media_url.join(Self::MEDIA_ENDPOINT)?; let body = serde_json::to_string(&request)?; - let mut request = client.post(get_url, body); - let headers = request.headers_mut(); - headers.insert(CONTENT_TYPE, Self::JSON_CONTENT); - + let request = client.json(get_url, body); let response = client.execute(request).await?; let body = response.text().await?; let items: media::Response = protocol::json(&body, Self::MEDIA_ENDPOINT)?; @@ -890,9 +886,11 @@ impl Track { /// # Errors /// /// Returns error if: - /// * No valid sources available - /// * Content unavailable in region + /// * No valid source found + /// * Track unavailable /// * Network error occurs + /// * HTTP response status is not successful (not 2xx) + /// * Download cannot start async fn open_stream(&self, client: &http::Client, medium: &Medium) -> Result { let now = SystemTime::now();