Skip to content

Commit

Permalink
feat: Implement API binding from the rust side for scrobblers
Browse files Browse the repository at this point in the history
  • Loading branch information
Losses committed Dec 19, 2024
1 parent d6a0bde commit ec8ac77
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 14 deletions.
7 changes: 1 addition & 6 deletions messages/scrobble.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,9 @@ message AuthenticateMultipleServiceRequest {
repeated LoginRequestItem requests = 1;
}

message AuthenticateMultipleServiceRequest {
repeated LoginRequestItem requests = 1;
}

// [RUST-SIGNAL]
message ScrobbleServiceStatus {
string serviceId = 1;
bool available = 2;
bool isAvailable = 2;
}

// [RUST-SIGNAL]
Expand Down
5 changes: 5 additions & 0 deletions native/hub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod mix;
mod playback;
mod player;
mod playlist;
mod scrobble;
mod search;
mod sfx;
mod stat;
Expand Down Expand Up @@ -54,6 +55,7 @@ use crate::mix::*;
use crate::playback::*;
use crate::player::initialize_player;
use crate::playlist::*;
use crate::scrobble::*;
use crate::search::*;
use crate::sfx::*;
use crate::stat::*;
Expand Down Expand Up @@ -209,6 +211,9 @@ async fn player_loop(path: String, db_connections: DatabaseConnections) {

FetchDirectoryTreeRequest => (main_db),

AuthenticateSingleServiceRequest => (scrobbler),
AuthenticateMultipleServiceRequest => (scrobbler),

ListLogRequest => (main_db),
ClearLogRequest => (main_db),
RemoveLogRequest => (main_db),
Expand Down
29 changes: 23 additions & 6 deletions native/hub/src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use playback::MediaPosition;
use scrobbling::manager::ScrobblingManager;
use scrobbling::ScrobblingTrack;

use crate::{CrashResponse, PlaybackStatus, PlaylistItem, PlaylistUpdate, RealtimeFft};
use crate::{
CrashResponse, PlaybackStatus, PlaylistItem, PlaylistUpdate, RealtimeFft,
ScrobbleServiceStatus, ScrobbleServiceStatusUpdated,
};

pub fn metadata_summary_to_scrobbling_track(
metadata: &PlayingItemMetadataSummary,
Expand Down Expand Up @@ -71,7 +74,9 @@ pub async fn initialize_player(
let dispatcher_for_played_through = Arc::clone(&dispatcher);

let scrobber_for_played_through = Arc::clone(&scrobbler);
let scrobber_for_error_reporter = Arc::clone(&scrobbler);

let scrobber_error_receiver = scrobbler.lock().await.subscribe_error();
let scrobber_status_receiver = scrobbler.lock().await.subscribe_login_status();

manager.lock().await.initialize()?;

Expand Down Expand Up @@ -258,6 +263,21 @@ pub async fn initialize_player(
}
});

task::spawn(async move {
while let Ok(value) = scrobber_status_receiver.recv().await {
ScrobbleServiceStatusUpdated {
services: value
.into_iter()
.map(|x| ScrobbleServiceStatus {
service_id: x.service.to_string(),
is_available: x.is_available,
})
.collect(),
}
.send_signal_to_dart();
}
});

task::spawn(async move {
while let Ok(value) = realtime_fft_receiver.recv().await {
send_realtime_fft(value).await;
Expand All @@ -267,10 +287,7 @@ pub async fn initialize_player(
task::spawn(async move {
let main_db = Arc::clone(&main_db_for_error_reporter);

let scrobbler = Arc::clone(&scrobber_for_error_reporter);
let error_receiver = scrobbler.lock().await.subscribe_error();

while let Ok(error) = error_receiver.recv().await {
while let Ok(error) = scrobber_error_receiver.recv().await {
error!(
"Scrobbler received error: {:?}::{:?}: {:#?}",
error.service, error.action, error.error
Expand Down
71 changes: 71 additions & 0 deletions native/hub/src/scrobble.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::sync::Arc;

use anyhow::Result;
use rinf::DartSignal;
use tokio::sync::Mutex;

use scrobbling::manager::{ScrobblingCredential, ScrobblingManager};

use crate::{
AuthenticateMultipleServiceRequest, AuthenticateSingleServiceRequest,
AuthenticateSingleServiceResponse,
};

pub async fn authenticate_single_service_request(
scrobbler: Arc<Mutex<ScrobblingManager>>,
dart_signal: DartSignal<AuthenticateSingleServiceRequest>,
) -> Result<()> {
let request = dart_signal.message.request;

if let Some(request) = request {
let result = scrobbler
.lock()
.await
.authenticate(
&request.service_id.into(),
&request.username,
&request.password,
request.api_key,
request.api_secret,
)
.await;

match result {
Ok(_) => AuthenticateSingleServiceResponse {
success: true,
error: None,
}
.send_signal_to_dart(),
Err(e) => AuthenticateSingleServiceResponse {
success: false,
error: format!("{:#?}", e).into(),
}
.send_signal_to_dart(),
}
}

Ok(())
}

pub async fn authenticate_multiple_service_request(
scrobbler: Arc<Mutex<ScrobblingManager>>,
dart_signal: DartSignal<AuthenticateMultipleServiceRequest>,
) -> Result<()> {
let requests = dart_signal.message.requests;

ScrobblingManager::authenticate_all(
scrobbler,
requests
.into_iter()
.map(|x| ScrobblingCredential {
service: x.service_id.into(),
username: x.username,
password: x.password,
api_key: x.api_key,
api_secret: x.api_secret,
})
.collect(),
);

Ok(())
}
72 changes: 70 additions & 2 deletions scrobbling/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::collections::VecDeque;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -19,6 +21,38 @@ pub enum ScrobblingService {
ListenBrainz,
}

impl fmt::Display for ScrobblingService {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let s = match self {
ScrobblingService::LastFm => "LastFm",
ScrobblingService::LibreFm => "LibreFm",
ScrobblingService::ListenBrainz => "ListenBrainz",
};
write!(f, "{}", s)
}
}

impl FromStr for ScrobblingService {
type Err = ();

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"LastFm" => Ok(ScrobblingService::LastFm),
"LibreFm" => Ok(ScrobblingService::LibreFm),
"ListenBrainz" => Ok(ScrobblingService::ListenBrainz),
_ => Err(()),
}
}
}

// Optionally implement From<String> using FromStr
impl From<String> for ScrobblingService {
fn from(s: String) -> Self {
ScrobblingService::from_str(&s)
.unwrap_or_else(|_| panic!("Invalid string for ScrobblingService: {}", s))
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum ActionType {
Authenticate,
Expand All @@ -33,20 +67,27 @@ pub struct ScrobblingError {
pub error: anyhow::Error,
}

#[derive(Debug)]
pub struct LoginStatus {
pub service: ScrobblingService,
pub is_available: bool,
}

pub struct ScrobblingManager {
lastfm: Option<LastFmClient>,
librefm: Option<LibreFmClient>,
listenbrainz: Option<ListenBrainzClient>,
max_retries: u32,
retry_delay: Duration,
error_sender: Arc<SimpleSender<ScrobblingError>>,
login_status_sender: Arc<SimpleSender<Vec<LoginStatus>>>,

is_authenticating: bool,
now_playing_cache: VecDeque<ScrobblingTrack>,
scrobble_cache: VecDeque<ScrobblingTrack>,
}

pub struct Credentials {
pub struct ScrobblingCredential {
pub service: ScrobblingService,
pub username: String,
pub password: String,
Expand All @@ -57,6 +98,7 @@ pub struct Credentials {
impl ScrobblingManager {
pub fn new(max_retries: u32, retry_delay: Duration) -> Self {
let (error_sender, _) = SimpleChannel::channel(32);
let (login_status_sender, _) = SimpleChannel::channel(32);

Self {
lastfm: None,
Expand All @@ -65,13 +107,33 @@ impl ScrobblingManager {
max_retries,
retry_delay,
error_sender: Arc::new(error_sender),
login_status_sender: Arc::new(login_status_sender),

is_authenticating: false,
now_playing_cache: VecDeque::with_capacity(1),
scrobble_cache: VecDeque::with_capacity(48),
}
}

pub async fn send_login_status(&self) {
let statuses = vec![
LoginStatus {
service: ScrobblingService::LastFm,
is_available: self.lastfm.is_some(),
},
LoginStatus {
service: ScrobblingService::LibreFm,
is_available: self.librefm.is_some(),
},
LoginStatus {
service: ScrobblingService::ListenBrainz,
is_available: self.listenbrainz.is_some(),
},
];

self.login_status_sender.send(statuses);
}

pub async fn authenticate(
&mut self,
service: &ScrobblingService,
Expand Down Expand Up @@ -115,12 +177,14 @@ impl ScrobblingManager {
Ok(_) => {
self.is_authenticating = false;
self.process_cache().await;
self.send_login_status().await;
break;
}
Err(e) => {
attempts += 1;
if attempts >= self.max_retries {
self.is_authenticating = false;
self.send_login_status().await;
return Err(e);
}
sleep(self.retry_delay).await;
Expand Down Expand Up @@ -221,7 +285,7 @@ impl ScrobblingManager {
}
}

pub fn authenticate_all(manager: Arc<Mutex<Self>>, credentials_list: Vec<Credentials>) {
pub fn authenticate_all(manager: Arc<Mutex<Self>>, credentials_list: Vec<ScrobblingCredential>) {
tokio::spawn(async move {
for credentials in credentials_list {
let mut manager = manager.lock().await;
Expand Down Expand Up @@ -485,4 +549,8 @@ impl ScrobblingManager {
pub fn subscribe_error(&self) -> SimpleReceiver<ScrobblingError> {
self.error_sender.subscribe()
}

pub fn subscribe_login_status(&self) -> SimpleReceiver<Vec<LoginStatus>> {
self.login_status_sender.subscribe()
}
}

0 comments on commit ec8ac77

Please sign in to comment.