From 2ce3ab3fba30d8c941e33b85335f17a24c3ddb65 Mon Sep 17 00:00:00 2001 From: Kamil Koczurek Date: Fri, 8 Mar 2024 11:55:06 +0100 Subject: [PATCH] payment: allow paying from deposits * Add deposit field to Allocation: * deposit id and contract address * Validate allocations with deposits by checking deposit status instead of the internal account balance. * Make SchedulePayment deposit-aware. --- Cargo.lock | 16 +-- Cargo.toml | 8 +- core/model/src/driver.rs | 14 +- core/model/src/payment.rs | 1 + core/payment-driver/erc20/src/driver.rs | 131 ++++++++++++++---- core/payment/examples/debit_note_flow.rs | 2 + .../down.sql | 1 + .../up.sql | 1 + core/payment/src/api/allocations.rs | 2 + core/payment/src/models/allocation.rs | 13 +- core/payment/src/processor.rs | 35 ++++- core/payment/src/schema.rs | 1 + core/payment/src/service.rs | 4 +- 13 files changed, 183 insertions(+), 46 deletions(-) create mode 100644 core/payment/migrations/2024-03-08-123456_allocation_deposit/down.sql create mode 100644 core/payment/migrations/2024-03-08-123456_allocation_deposit/up.sql diff --git a/Cargo.lock b/Cargo.lock index 5298cfa7a5..791f848cc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2014,7 +2014,7 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "erc20_payment_lib" version = "0.4.1" -source = "git+https://github.com/golemfactory/erc20_payment_lib?rev=0304f7f3075b2a353c028370f56ddb83baf7581c#0304f7f3075b2a353c028370f56ddb83baf7581c" +source = "git+https://github.com/golemfactory/erc20_payment_lib?rev=d82a4e2031ec2f434e301190480b5522f371848b#d82a4e2031ec2f434e301190480b5522f371848b" dependencies = [ "actix", "actix-files", @@ -2056,7 +2056,7 @@ dependencies = [ [[package]] name = "erc20_payment_lib_common" version = "0.4.1" -source = "git+https://github.com/golemfactory/erc20_payment_lib?rev=0304f7f3075b2a353c028370f56ddb83baf7581c#0304f7f3075b2a353c028370f56ddb83baf7581c" +source = "git+https://github.com/golemfactory/erc20_payment_lib?rev=d82a4e2031ec2f434e301190480b5522f371848b#d82a4e2031ec2f434e301190480b5522f371848b" dependencies = [ "actix-files", "actix-web", @@ -2097,7 +2097,7 @@ dependencies = [ [[package]] name = "erc20_payment_lib_extra" version = "0.4.1" -source = "git+https://github.com/golemfactory/erc20_payment_lib?rev=0304f7f3075b2a353c028370f56ddb83baf7581c#0304f7f3075b2a353c028370f56ddb83baf7581c" +source = "git+https://github.com/golemfactory/erc20_payment_lib?rev=d82a4e2031ec2f434e301190480b5522f371848b#d82a4e2031ec2f434e301190480b5522f371848b" dependencies = [ "actix-files", "actix-web", @@ -2133,7 +2133,7 @@ dependencies = [ [[package]] name = "erc20_processor" version = "0.4.1" -source = "git+https://github.com/golemfactory/erc20_payment_lib?rev=0304f7f3075b2a353c028370f56ddb83baf7581c#0304f7f3075b2a353c028370f56ddb83baf7581c" +source = "git+https://github.com/golemfactory/erc20_payment_lib?rev=d82a4e2031ec2f434e301190480b5522f371848b#d82a4e2031ec2f434e301190480b5522f371848b" dependencies = [ "actix-cors", "actix-files", @@ -2181,7 +2181,7 @@ dependencies = [ [[package]] name = "erc20_rpc_pool" version = "0.4.1" -source = "git+https://github.com/golemfactory/erc20_payment_lib?rev=0304f7f3075b2a353c028370f56ddb83baf7581c#0304f7f3075b2a353c028370f56ddb83baf7581c" +source = "git+https://github.com/golemfactory/erc20_payment_lib?rev=d82a4e2031ec2f434e301190480b5522f371848b#d82a4e2031ec2f434e301190480b5522f371848b" dependencies = [ "actix-files", "actix-web", @@ -8326,8 +8326,7 @@ dependencies = [ [[package]] name = "ya-client" version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b88e1d7668f54df1922a3ec157d09d818c059bbf8a49e47e2a5d9b17de13f8df" +source = "git+https://github.com/golemfactory/ya-client.git?rev=e58208bdd521afbb626021d94b0b91a7d17d4e9d#e58208bdd521afbb626021d94b0b91a7d17d4e9d" dependencies = [ "actix-codec", "awc", @@ -8376,8 +8375,7 @@ dependencies = [ [[package]] name = "ya-client-model" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe85a762be0297b9848ad0b7f1c73cb2afe778bb8eb3a952efd032792631a3e0" +source = "git+https://github.com/golemfactory/ya-client.git?rev=e58208bdd521afbb626021d94b0b91a7d17d4e9d#e58208bdd521afbb626021d94b0b91a7d17d4e9d" dependencies = [ "bigdecimal 0.2.2", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 600c2df021..4cd25a68b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -232,8 +232,8 @@ members = [ # diesel 1.4.* supports up to 0.23.0, but sqlx 0.5.9 requires 0.22.0 # sqlx 0.5.10 need 0.23.2, so 0.5.9 is last version possible derive_more = "0.99.11" -erc20_payment_lib = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "0304f7f3075b2a353c028370f56ddb83baf7581c" } -erc20_processor = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "0304f7f3075b2a353c028370f56ddb83baf7581c" } +erc20_payment_lib = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "d82a4e2031ec2f434e301190480b5522f371848b" } +erc20_processor = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "d82a4e2031ec2f434e301190480b5522f371848b" } #erc20_payment_lib = { path = "../../payments/erc20_payment_lib/crates/erc20_payment_lib" } #erc20_processor = { path = "../../payments/erc20_payment_lib" } #erc20_payment_lib = { version = "=0.4.1" } @@ -280,9 +280,9 @@ ya-service-api-interfaces = { path = "core/serv-api/interfaces" } ya-service-api-web = { path = "core/serv-api/web" } ## CLIENT -#ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "57dc4808f9db63ce20e4e7b799fd6a1613610d0a" } +ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "e58208bdd521afbb626021d94b0b91a7d17d4e9d" } #ya-client = { path = "../ya-client" } -#ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "v0.7.0" } +ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "e58208bdd521afbb626021d94b0b91a7d17d4e9d" } #ya-client-model = "0.7" golem-certificate = { git = "https://github.com/golemfactory/golem-certificate.git", rev = "f2d7514c18fc066e9cfb796090b90f5b27cfe1c6" } diff --git a/core/model/src/driver.rs b/core/model/src/driver.rs index 8aa7c04240..02f585ea17 100644 --- a/core/model/src/driver.rs +++ b/core/model/src/driver.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::fmt::Display; use std::time::Duration; -use ya_client_model::payment::{Allocation, DriverStatusProperty, Payment}; +use ya_client_model::payment::{allocation::Deposit, Allocation, DriverStatusProperty, Payment}; use ya_service_bus::RpcMessage; pub fn driver_bus_id(driver_name: T) -> String { @@ -316,6 +316,7 @@ pub struct SchedulePayment { sender: String, recipient: String, platform: String, + deposit_id: Option, due_date: DateTime, } @@ -325,6 +326,7 @@ impl SchedulePayment { sender: String, recipient: String, platform: String, + deposit_id: Option, due_date: DateTime, ) -> SchedulePayment { SchedulePayment { @@ -332,6 +334,7 @@ impl SchedulePayment { sender, recipient, platform, + deposit_id, due_date, } } @@ -352,6 +355,10 @@ impl SchedulePayment { self.platform.clone() } + pub fn deposit_id(&self) -> Option { + self.deposit_id.clone() + } + pub fn due_date(&self) -> DateTime { self.due_date } @@ -370,6 +377,8 @@ pub struct ValidateAllocation { pub address: String, pub platform: String, pub amount: BigDecimal, + pub timeout: Option>, + pub deposit: Option, pub existing_allocations: Vec, } @@ -378,12 +387,15 @@ impl ValidateAllocation { address: String, platform: String, amount: BigDecimal, + timeout: Option>, existing: Vec, ) -> Self { ValidateAllocation { address, platform, amount, + timeout, + deposit: None, existing_allocations: existing, } } diff --git a/core/model/src/payment.rs b/core/model/src/payment.rs index 41ba8cb498..40402f914a 100644 --- a/core/model/src/payment.rs +++ b/core/model/src/payment.rs @@ -425,6 +425,7 @@ pub mod local { pub platform: String, pub address: String, pub amount: BigDecimal, + pub timeout: Option>, } impl RpcMessage for ValidateAllocation { diff --git a/core/payment-driver/erc20/src/driver.rs b/core/payment-driver/erc20/src/driver.rs index c150414905..268266c8d1 100644 --- a/core/payment-driver/erc20/src/driver.rs +++ b/core/payment-driver/erc20/src/driver.rs @@ -24,6 +24,7 @@ use std::time::Instant; use tokio::sync::mpsc::Receiver; use uuid::Uuid; use web3::types::{Address, H256}; +use ya_client_model::payment::allocation::Deposit; use ya_client_model::payment::DriverStatusProperty; use ya_payment_driver::driver::IdentityError; @@ -118,6 +119,7 @@ impl Erc20Driver { amount: &BigDecimal, network: &str, deadline: Option>, + deposit_id: Option, ) -> Result { self.is_account_active(sender).await?; let sender = H160::from_str(sender) @@ -137,8 +139,7 @@ impl Erc20Driver { amount, payment_id: payment_id.clone(), deadline, - allocation_id: None, - use_internal: false, + deposit_id, }) .await .map_err(|err| GenericError::new(format!("Error when inserting transfer {err:?}")))?; @@ -270,6 +271,12 @@ impl Erc20Driver { network, }) } + LibStatusProperty::InvalidChainId { chain_id, .. } => { + Some(DriverStatusProperty::InvalidChainId { + driver: DRIVER_NAME.into(), + chain_id, + }) + } }) .collect()) } @@ -354,6 +361,93 @@ impl Erc20Driver { ) .await } + + async fn validate_allocation_internal( + &self, + caller: String, + msg: ValidateAllocation, + ) -> Result { + if msg.deposit.is_some() { + Err(GenericError::new( + "validate_allocation_internal called with not empty deposit", + ))?; + } + + let account_balance = self + .get_account_balance( + caller, + GetAccountBalance::new(msg.address, msg.platform.clone()), + ) + .await?; + + let total_allocated_amount: BigDecimal = msg + .existing_allocations + .into_iter() + .filter(|allocation| allocation.payment_platform == msg.platform) + .map(|allocation| allocation.remaining_amount) + .sum(); + + log::info!( + "Allocation validation: \ + allocating: {:.5}, \ + account_balance: {:.5}, \ + total_allocated_amount: {:.5}", + msg.amount, + account_balance, + total_allocated_amount, + ); + + Ok(msg.amount <= account_balance - total_allocated_amount) + } + + async fn validate_allocation_deposit( + &self, + msg: ValidateAllocation, + deposit: Deposit, + ) -> Result { + let network = msg + .platform + .split('-') + .nth(1) + .ok_or(GenericError::new(format!( + "Malformed platform string: {}", + msg.platform + )))?; + + let deposit_details = self + .payment_runtime + .deposit_details( + network.to_string(), + U256::from_str(&deposit.id).unwrap(), + Address::from_str(&deposit.contract).unwrap(), + ) + .await + .map_err(|e| GenericError::new(e))?; + let deposit_balance = + BigDecimal::new(BigInt::from_str(&deposit_details.amount).unwrap(), 18); + + log::info!( + "Allocation validation with deposit: \ + allocating: {:.5}, \ + deposit balance: {:.5}, \ + requested timeout: {}, \ + deposit valid to: {}", + msg.amount, + deposit_balance, + msg.timeout + .map(|tm| tm.to_string()) + .unwrap_or(String::from("never")), + deposit_details.valid_to, + ); + + let valid_amount = msg.amount <= deposit_balance; + let valid_timeout = msg + .timeout + .map(|timeout| timeout <= deposit_details.valid_to) + .unwrap_or(false); + + Ok(valid_amount && valid_timeout) + } } #[async_trait(?Send)] @@ -814,6 +908,7 @@ impl PaymentDriver for Erc20Driver { &msg.amount, &network, Some(Utc::now()), + None, ) .await } @@ -839,6 +934,7 @@ impl PaymentDriver for Erc20Driver { &msg.amount(), network, Some(msg.due_date() - transfer_margin), + msg.deposit_id(), ) .await } @@ -887,34 +983,15 @@ impl PaymentDriver for Erc20Driver { async fn validate_allocation( &self, caller: String, - msg: ValidateAllocation, + mut msg: ValidateAllocation, ) -> Result { log::debug!("Validate_allocation: {:?}", msg); - let account_balance = self - .get_account_balance( - caller, - GetAccountBalance::new(msg.address, msg.platform.clone()), - ) - .await?; - let total_allocated_amount: BigDecimal = msg - .existing_allocations - .into_iter() - .filter(|allocation| allocation.payment_platform == msg.platform) - .map(|allocation| allocation.remaining_amount) - .sum(); - - log::info!( - "Allocation validation: \ - allocating: {:.5}, \ - account_balance: {:.5}, \ - total_allocated_amount: {:.5}", - msg.amount, - account_balance, - total_allocated_amount, - ); - - Ok(msg.amount <= account_balance - total_allocated_amount) + if let Some(deposit) = msg.deposit.take() { + self.validate_allocation_deposit(msg, deposit).await + } else { + self.validate_allocation_internal(caller, msg).await + } } async fn status( diff --git a/core/payment/examples/debit_note_flow.rs b/core/payment/examples/debit_note_flow.rs index 56e36c47b2..6d7bf88689 100644 --- a/core/payment/examples/debit_note_flow.rs +++ b/core/payment/examples/debit_note_flow.rs @@ -76,9 +76,11 @@ async fn main() -> anyhow::Result<()> { address: None, // Use default address (i.e. identity) payment_platform: Some(PaymentPlatformEnum::PaymentPlatformName(args.platform)), total_amount: BigDecimal::from(10u64), + timeout: None, make_deposit: false, deposit: None, timeout: None, + make_deposit: false, }) .await?; log::info!("Allocation created."); diff --git a/core/payment/migrations/2024-03-08-123456_allocation_deposit/down.sql b/core/payment/migrations/2024-03-08-123456_allocation_deposit/down.sql new file mode 100644 index 0000000000..86bd2f21c6 --- /dev/null +++ b/core/payment/migrations/2024-03-08-123456_allocation_deposit/down.sql @@ -0,0 +1 @@ +ALTER TABLE pay_allocation REMOVE COLUMN deposit; diff --git a/core/payment/migrations/2024-03-08-123456_allocation_deposit/up.sql b/core/payment/migrations/2024-03-08-123456_allocation_deposit/up.sql new file mode 100644 index 0000000000..6d73e33c5c --- /dev/null +++ b/core/payment/migrations/2024-03-08-123456_allocation_deposit/up.sql @@ -0,0 +1 @@ +ALTER TABLE pay_allocation ADD COLUMN deposit TEXT DEFAULT NULL; \ No newline at end of file diff --git a/core/payment/src/api/allocations.rs b/core/payment/src/api/allocations.rs index 4748675975..d80e4e2bbf 100644 --- a/core/payment/src/api/allocations.rs +++ b/core/payment/src/api/allocations.rs @@ -339,6 +339,7 @@ async fn create_allocation( platform: payment_triple.to_string(), address: address.clone(), amount: allocation.total_amount.clone(), + timeout: allocation.timeout, }; match async move { Ok(bus::service(LOCAL_SERVICE).send(validate_msg).await??) }.await { @@ -488,6 +489,7 @@ async fn amend_allocation( } else { 0.into() }, + timeout: amended_allocation.timeout, }; match async move { Ok(bus::service(LOCAL_SERVICE).send(validate_msg).await??) }.await { Ok(true) => {} diff --git a/core/payment/src/models/allocation.rs b/core/payment/src/models/allocation.rs index 94b299636e..4cce971d38 100644 --- a/core/payment/src/models/allocation.rs +++ b/core/payment/src/models/allocation.rs @@ -17,6 +17,7 @@ pub struct WriteObj { pub remaining_amount: BigDecimalField, pub timeout: Option, pub make_deposit: bool, + pub deposit: Option, pub released: bool, } @@ -33,6 +34,7 @@ pub struct ReadObj { pub timestamp: NaiveDateTime, pub timeout: Option, pub make_deposit: bool, + pub deposit: Option, pub released: bool, } @@ -53,6 +55,9 @@ impl WriteObj { remaining_amount: allocation.total_amount.into(), timeout: allocation.timeout.map(|v| v.naive_utc()), make_deposit: allocation.make_deposit, + deposit: allocation + .deposit + .map(|deposit| serde_json::to_string(&deposit).unwrap()), released: false, } } @@ -68,6 +73,9 @@ impl WriteObj { remaining_amount: allocation.remaining_amount.into(), timeout: allocation.timeout.map(|v| v.naive_utc()), make_deposit: allocation.make_deposit, + deposit: allocation + .deposit + .map(|deposit| serde_json::to_string(&deposit).unwrap()), released: false, } } @@ -85,7 +93,10 @@ impl From for Allocation { timestamp: Utc.from_utc_datetime(&allocation.timestamp), timeout: allocation.timeout.map(|v| Utc.from_utc_datetime(&v)), make_deposit: allocation.make_deposit, - deposit: None, + deposit: allocation + .deposit + .map(|s| serde_json::from_str(&s).ok()) + .flatten(), } } } diff --git a/core/payment/src/processor.rs b/core/payment/src/processor.rs index f80a972cb6..ffbbb31566 100644 --- a/core/payment/src/processor.rs +++ b/core/payment/src/processor.rs @@ -1,5 +1,7 @@ use crate::api::allocations::{forced_release_allocation, release_allocation_after}; -use crate::dao::{ActivityDao, AgreementDao, AllocationDao, OrderDao, PaymentDao, SyncNotifsDao}; +use crate::dao::{ + ActivityDao, AgreementDao, AllocationDao, AllocationStatus, OrderDao, PaymentDao, SyncNotifsDao, +}; use crate::error::processor::{ AccountNotRegistered, GetStatusError, NotifyPaymentError, OrderValidationError, SchedulePaymentError, ValidateAllocationError, VerifyPaymentError, @@ -9,10 +11,12 @@ use crate::payment_sync::SYNC_NOTIFS_NOTIFY; use crate::timeout_lock::{MutexTimeoutExt, RwLockTimeoutExt}; use actix_web::web::Data; use bigdecimal::{BigDecimal, Zero}; +use chrono::{DateTime, Utc}; use futures::{FutureExt, TryFutureExt}; use metrics::counter; use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -533,7 +537,11 @@ impl PaymentProcessor { Ok(()) } - pub async fn schedule_payment(&self, msg: SchedulePayment) -> Result<(), SchedulePaymentError> { + pub async fn schedule_payment( + &self, + caller: String, + msg: SchedulePayment, + ) -> Result<(), SchedulePaymentError> { if self.in_shutdown.load(Ordering::SeqCst) { return Err(SchedulePaymentError::Shutdown); } @@ -544,6 +552,25 @@ impl PaymentProcessor { &amount ))); } + + let allocation_status = self + .db_executor + .lock() + .await + .as_dao::() + .get( + msg.allocation_id.clone(), + NodeId::from_str(&caller) + .map_err(|e| SchedulePaymentError::InvalidInput(e.to_string()))?, + ) + .await?; + + let deposit_id = if let AllocationStatus::Active(allocation) = allocation_status { + allocation.deposit.clone().map(|deposit| deposit.id) + } else { + None + }; + let driver = self .registry .timeout_read(REGISTRY_LOCK_TIMEOUT) @@ -555,6 +582,7 @@ impl PaymentProcessor { msg.payer_addr.clone(), msg.payee_addr.clone(), msg.payment_platform.clone(), + deposit_id, msg.due_date, )) .await??; @@ -778,6 +806,7 @@ impl PaymentProcessor { platform: String, address: String, amount: BigDecimal, + timeout: Option>, ) -> Result { if self.in_shutdown.load(Ordering::SeqCst) { return Err(ValidateAllocationError::Shutdown); @@ -798,6 +827,8 @@ impl PaymentProcessor { address, platform, amount, + timeout, + deposit: None, existing_allocations, }; let result = driver_endpoint(&driver).send(msg).await??; diff --git a/core/payment/src/schema.rs b/core/payment/src/schema.rs index 325750732e..8fbdb7b523 100644 --- a/core/payment/src/schema.rs +++ b/core/payment/src/schema.rs @@ -60,6 +60,7 @@ table! { timestamp -> Timestamp, timeout -> Nullable, make_deposit -> Bool, + deposit -> Nullable, released -> Bool, } } diff --git a/core/payment/src/service.rs b/core/payment/src/service.rs index 6d5d25ad62..c41aac975b 100644 --- a/core/payment/src/service.rs +++ b/core/payment/src/service.rs @@ -131,7 +131,7 @@ mod local { msg: SchedulePayment, ) -> Result<(), GenericError> { log::debug!("Schedule payment processor started"); - let res = processor.schedule_payment(msg).await; + let res = processor.schedule_payment(sender, msg).await; log::debug!("Schedule payment processor finished"); Ok(res?) } @@ -414,7 +414,7 @@ mod local { msg: ValidateAllocation, ) -> Result { Ok(processor - .validate_allocation(msg.platform, msg.address, msg.amount) + .validate_allocation(msg.platform, msg.address, msg.amount, msg.timeout) .await?) }