Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check if job payment is settled for partial payments #145

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Golem/ActivityLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public async Task<List<Job>> UpdateJobs(IJobs jobs, List<ActivityState> activity
var usage = d.Usage != null
? GolemUsage.From(d.Usage)
: null;

return await jobs.UpdateJobByActivity(d.Id, null, usage);
}
)
Expand Down
39 changes: 11 additions & 28 deletions Golem/Jobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public async Task<Job> GetOrCreateJob(string jobId)
{
Id = jobId,
RequestorId = requestorId,
Timestamp = agreement.Timestamp ?? throw new Exception($"Incomplete demand of agreement {agreement.AgreementID}")
Timestamp = agreement.Timestamp ?? throw new Exception($"Incomplete demand of agreement {agreement.AgreementID}"),
Logger = _logger,
};

var price = GetPriceFromAgreement(agreement);
Expand Down Expand Up @@ -171,18 +172,7 @@ public async Task<Job> UpdateJobPayment(Invoice invoice)
.Where(p => p.AgreementPayments.Exists(ap => ap.AgreementId == invoice.AgreementId) || p.ActivityPayments.Exists(ap => invoice.ActivityIds.Contains(ap.ActivityId)))
.ToList();
job.PaymentConfirmation = paymentsForRecentJob;
job.PaymentStatus = IntoPaymentStatus(invoice.Status);

var confirmedSum = job.PaymentConfirmation.Sum(payment => Convert.ToDecimal(payment.Amount, CultureInfo.InvariantCulture));

_logger.LogInformation($"Job: {job.Id}, confirmed sum: {confirmedSum}, job expected reward: {job.CurrentReward}");

// Workaround for yagna unable to change status to SETTLED when using partial payments
if (invoice.Status == InvoiceStatus.ACCEPTED
&& job.CurrentReward == confirmedSum)
{
job.PaymentStatus = IntoPaymentStatus(InvoiceStatus.SETTLED);
}
job.PaymentStatus = job.EvaluatePaymentStatus(Job.IntoPaymentStatus(invoice.Status));

return job;
}
Expand All @@ -193,7 +183,13 @@ public async Task UpdatePartialPayment(Payment payment)
{
var agreementId = await this._yagna.Api.GetActivityAgreement(activityPayment.ActivityId);
var job = await GetOrCreateJob(agreementId);
job.PartialPayment(payment);
job.AddPartialPayment(payment);
}

foreach (var agreementPayment in payment.AgreementPayments)
{
var job = await GetOrCreateJob(agreementPayment.AgreementId);
job.AddPartialPayment(payment);
}
}

Expand Down Expand Up @@ -251,18 +247,5 @@ private async Task<Job> UpdateJobUsage(string agreementId)
return job;
}

private static GolemLib.Types.PaymentStatus IntoPaymentStatus(InvoiceStatus status)
{
return status switch
{
InvoiceStatus.ISSUED => GolemLib.Types.PaymentStatus.InvoiceSent,
InvoiceStatus.RECEIVED => GolemLib.Types.PaymentStatus.InvoiceSent,
InvoiceStatus.ACCEPTED => GolemLib.Types.PaymentStatus.Accepted,
InvoiceStatus.REJECTED => GolemLib.Types.PaymentStatus.Rejected,
InvoiceStatus.FAILED => GolemLib.Types.PaymentStatus.Rejected,
InvoiceStatus.SETTLED => GolemLib.Types.PaymentStatus.Settled,
InvoiceStatus.CANCELLED => GolemLib.Types.PaymentStatus.Rejected,
_ => throw new Exception($"Unknown InvoiceStatus: {status}"),
};
}

}
39 changes: 38 additions & 1 deletion Golem/Yagna/Job.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
using System.ComponentModel;
using System.Globalization;
using System.Runtime.CompilerServices;

using Golem.Model;

using GolemLib;
using GolemLib.Types;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

using static Golem.Model.ActivityState;

namespace Golem.Yagna.Types
{
public class Job : IJob
{
public ILogger Logger { get; set; } = NullLogger.Instance;

public required string Id { get; init; }

public string RequestorId { get; init; } = "";
Expand Down Expand Up @@ -109,11 +115,12 @@ public void UpdateActivityState(ActivityStatePair activityState)
this.Status = ResolveStatus(currentState, nextState);
}

public void PartialPayment(Payment payment)
public void AddPartialPayment(Payment payment)
{
if (!PaymentConfirmation.Exists(pay => pay.PaymentId == payment.PaymentId))
{
PaymentConfirmation.Add(payment);
PaymentStatus = EvaluatePaymentStatus(PaymentStatus);
}
}

Expand All @@ -137,6 +144,36 @@ private JobStatus ResolveStatus(StateType currentState, StateType? nextState)
return JobStatus.Idle;
}

public GolemLib.Types.PaymentStatus? EvaluatePaymentStatus(GolemLib.Types.PaymentStatus? suggestedPaymentStatus)
{
var confirmedSum = this.PaymentConfirmation.Sum(payment => Convert.ToDecimal(payment.Amount, CultureInfo.InvariantCulture));

Logger.LogInformation($"Job: {this.Id}, confirmed sum: {confirmedSum}, job expected reward: {this.CurrentReward}");

// Workaround for yagna unable to change status to SETTLED when using partial payments
if (suggestedPaymentStatus == GolemLib.Types.PaymentStatus.Accepted
&& this.CurrentReward == confirmedSum)
{
return IntoPaymentStatus(InvoiceStatus.SETTLED);
}
return suggestedPaymentStatus;
}

public static GolemLib.Types.PaymentStatus IntoPaymentStatus(InvoiceStatus status)
{
return status switch
{
InvoiceStatus.ISSUED => GolemLib.Types.PaymentStatus.InvoiceSent,
InvoiceStatus.RECEIVED => GolemLib.Types.PaymentStatus.InvoiceSent,
InvoiceStatus.ACCEPTED => GolemLib.Types.PaymentStatus.Accepted,
InvoiceStatus.REJECTED => GolemLib.Types.PaymentStatus.Rejected,
InvoiceStatus.FAILED => GolemLib.Types.PaymentStatus.Rejected,
InvoiceStatus.SETTLED => GolemLib.Types.PaymentStatus.Settled,
InvoiceStatus.CANCELLED => GolemLib.Types.PaymentStatus.Rejected,
_ => throw new Exception($"Unknown InvoiceStatus: {status}"),
};
}

public override int GetHashCode()
{
return HashCode.Combine(Id, RequestorId, Status, PaymentStatus, PaymentConfirmation, CurrentUsage);
Expand Down
48 changes: 34 additions & 14 deletions example/ai-requestor/ai_runtime.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import asyncio
import base64
from concurrent.futures import ThreadPoolExecutor
import io
import json
import os
from typing import Final
from PIL import Image
import requests

from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timedelta

from yapapi import Golem
from yapapi.payload import Payload
from yapapi.props import inf
from yapapi.props.base import constraint, prop
from yapapi.services import Service
from yapapi.log import enable_default_logger
from yapapi.config import ApiConfig

import argparse
import asyncio
Expand All @@ -30,9 +31,13 @@
from yapapi import windows_event_loop_fix
from yapapi.log import enable_default_logger
from yapapi.strategy import SCORE_TRUSTED, SCORE_REJECTED, MarketStrategy
from yapapi.rest import Activity
from yapapi.strategy.base import PropValueRange, PROP_DEBIT_NOTE_INTERVAL_SEC, PROP_PAYMENT_TIMEOUT_SEC
from ya_activity import RequestorControlApi

from ya_activity import ApiClient, ApiException, RequestorControlApi, RequestorStateApi
PROP_PAYMENT_TIMEOUT_SEC: Final[str] = "golem.com.scheme.payu.payment-timeout-sec?"
PROP_DEBIT_NOTE_ACCEPTANCE_TIMEOUT: Final[str] = "golem.com.payment.debit-notes.accept-timeout?"

MID_AGREEMENT_PAYMENTS_PROPS = [PROP_DEBIT_NOTE_INTERVAL_SEC, PROP_PAYMENT_TIMEOUT_SEC]

# Utils

Expand Down Expand Up @@ -61,6 +66,7 @@ def build_parser(description: str) -> argparse.ArgumentParser:
"--payment-network", "--network", help="Payment network name, for example `holesky`"
)
parser.add_argument("--subnet-tag", help="Subnet name, for example `public`")
parser.add_argument("--runtime", default="automatic", help="Runtime name, for example `automatic`")
parser.add_argument(
"--log-file",
default=str(default_log_path),
Expand Down Expand Up @@ -138,12 +144,16 @@ class ProviderOnceStrategy(MarketStrategy):

def __init__(self):
self.history = set(())
self.acceptable_prop_value_range_overrides = {
PROP_DEBIT_NOTE_INTERVAL_SEC: PropValueRange(60, None),
PROP_PAYMENT_TIMEOUT_SEC: PropValueRange(int(180), None),
}

async def score_offer(self, offer):
if offer.issuer not in self.history:
return SCORE_TRUSTED
else:
print(f"Rejecting issuer: {offer.props['golem.node.id.name']} ({offer.issuer})")
print(f"[Strategy] Rejecting issuer: {offer.props['golem.node.id.name']} ({offer.issuer})")
return SCORE_REJECTED


Expand All @@ -152,8 +162,8 @@ def remember(self, provider_id: str):

# App

RUNTIME_NAME = "automatic"
#RUNTIME_NAME = "dummy"
#RUNTIME_NAME = "automatic"
RUNTIME_NAME = "dummy"

@dataclass
class AiPayload(Payload):
Expand All @@ -164,12 +174,18 @@ class AiPayload(Payload):


class AiRuntimeService(Service):
runtime: str

@staticmethod
async def get_payload():
## TODO switched into using smaller model to avoid problems during tests. Resolve it when automatic runtime integrated
# return AiPayload(image_url="hash:sha3:92180a67d096be309c5e6a7146d89aac4ef900e2bf48a52ea569df7d:https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0/resolve/main/sd_xl_base_1.0.safetensors?download=true")
# return AiPayload(image_url="hash:sha3:0b682cf78786b04dc108ff0b254db1511ef820105129ad021d2e123a7b975e7c:https://huggingface.co/cointegrated/rubert-tiny2/resolve/main/model.safetensors?download=true")
return AiPayload(image_url="hash:sha3:b2da48d618beddab1887739d75b50a3041c810bc73805a416761185998359b24:https://huggingface.co/runwayml/stable-diffusion-v1-5/resolve/main/v1-5-pruned-emaonly.safetensors?download=true")
return AiPayload(
image_url="hash:sha3:b2da48d618beddab1887739d75b50a3041c810bc73805a416761185998359b24:https://huggingface.co/runwayml/stable-diffusion-v1-5/resolve/main/v1-5-pruned-emaonly.safetensors?download=true",
runtime=AiRuntimeService.runtime,
)

async def start(self):
self.strategy.remember(self._ctx.provider_id)

Expand All @@ -178,9 +194,6 @@ async def start(self):
script.start()
yield script

# async def run(self):
# # TODO run AI tasks here

def __init__(self, strategy: ProviderOnceStrategy):
super().__init__()
self.strategy = strategy
Expand Down Expand Up @@ -214,21 +227,27 @@ async def trigger(activity: RequestorControlApi, token, prompt, output_file):
print(f"Error code: {response.status_code}, message: {response.text}")


async def main(subnet_tag, driver=None, network=None):
async def ainput(prompt: str = ""):
return await asyncio.to_thread(input, prompt)


async def main(subnet_tag, driver=None, network=None, args=None):
strategy = ProviderOnceStrategy()
async with Golem(
budget=10.0,
budget=100.0,
subnet_tag=subnet_tag,
strategy=strategy,
payment_driver=driver,
payment_network=network,
) as golem:
AiRuntimeService.runtime = args.runtime
cluster = await golem.run_service(
AiRuntimeService,
instance_params=[
{"strategy": strategy}
],
num_instances=1,
expiration=datetime.now(timezone.utc) + timedelta(days=10),
)

def instances():
Expand Down Expand Up @@ -273,7 +292,7 @@ async def get_image(prompt, file_name):

if len(running) > 0:
print('Please type your prompt:')
prompt = input()
prompt = await ainput()
print('Sending to automatic')
await get_image(
prompt,
Expand All @@ -294,6 +313,7 @@ async def get_image(prompt, file_name):
subnet_tag=args.subnet_tag,
driver=args.payment_driver,
network=args.payment_network,
args=args,
),
log_file=args.log_file,
)
Loading
Loading