Skip to content

Commit

Permalink
Merge pull request #1109 from golemfactory/mf/prov-activity-event-seq…
Browse files Browse the repository at this point in the history
…uence

Provider agent: process activity events sequentially
  • Loading branch information
mfranciszkiewicz authored Mar 3, 2021
2 parents 804021e + ed1fd76 commit 10b8f7c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 32 deletions.
30 changes: 19 additions & 11 deletions agent/provider/src/provider_agent.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
use actix::prelude::*;
use actix::utils::IntervalFunc;
use anyhow::{anyhow, Error};
use futures::{future, FutureExt, StreamExt, TryFutureExt};
use serde::{Deserialize, Deserializer, Serialize};
use std::convert::TryFrom;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, SystemTime};
use std::{fs, io};

use ya_agreement_utils::agreement::TypedArrayPointer;
use ya_agreement_utils::*;
use ya_client::cli::ProviderApi;
use ya_core_model::{payment::local::NetworkName, NodeId};
use ya_file_logging::{start_logger, LoggerHandle};
use ya_utils_actix::actix_handler::send_message;
use ya_utils_path::SwapSave;

use crate::dir::clean_provider_dir;
Expand Down Expand Up @@ -312,10 +310,6 @@ impl ProviderAgent {
Ok(cnts.to_string())
}

fn schedule_jobs(&mut self, _ctx: &mut Context<Self>) {
send_message(self.runner.clone(), UpdateActivity);
}

fn create_node_info(&self) -> NodeInfo {
let globals = self.globals.get_state();

Expand Down Expand Up @@ -428,13 +422,27 @@ fn get_usage_vector_value(prices: &Vec<(String, f64)>) -> serde_json::Value {
serde_json::Value::Array(vec)
}

async fn process_activity_events(runner: Addr<TaskRunner>) {
const ZERO: Duration = Duration::from_secs(0);
const DEFAULT: Duration = Duration::from_secs(4);

loop {
let started = SystemTime::now();
if let Err(error) = runner.send(UpdateActivity).await {
log::error!("Error processing activity events: {:?}", error);
}
let elapsed = SystemTime::now().duration_since(started).unwrap_or(ZERO);
let delay = DEFAULT.checked_sub(elapsed).unwrap_or(ZERO);
tokio::time::delay_for(delay).await;
}
}

impl Actor for ProviderAgent {
type Context = Context<Self>;

fn started(&mut self, context: &mut Context<Self>) {
IntervalFunc::new(Duration::from_secs(4), Self::schedule_jobs)
.finish()
.spawn(context);
fn started(&mut self, ctx: &mut Context<Self>) {
let runner = self.runner.clone();
ctx.spawn(process_activity_events(runner).into_actor(self));
}
}

Expand Down
21 changes: 0 additions & 21 deletions utils/actix_utils/src/actix_handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use actix::prelude::dev::ToEnvelope;
use actix::prelude::*;
use log::error;

/// Trait that allows to extract error type ok type from Result.
/// Could use std::ops::Try, but it is marked as unstable.
pub trait ResultTypeGetter {
Expand Down Expand Up @@ -36,20 +32,3 @@ macro_rules! forward_actix_handler {
}
};
} // forward_actix_handler

// Sends message to other actor.
pub fn send_message<ActorType, MessageType>(actor: Addr<ActorType>, msg: MessageType)
where
MessageType: Message + Send + 'static,
MessageType::Result: Send,
ActorType: Handler<MessageType>,
ActorType::Context: ToEnvelope<ActorType, MessageType>,
{
let future = async move {
if let Err(error) = actor.send(msg).await {
//TODO: We could print more information about error.
error!("Error sending message: {}.", error);
};
};
Arbiter::spawn(future);
}

0 comments on commit 10b8f7c

Please sign in to comment.