Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor farmer metrics #2531

Merged
merged 1 commit into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 39 additions & 25 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ mod dsn;
mod metrics;

use crate::commands::farm::dsn::configure_dsn;
use crate::commands::farm::metrics::FarmerMetrics;
use crate::commands::farm::metrics::{FarmerMetrics, SectorState};
use crate::utils::shutdown_signal;
use anyhow::anyhow;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
use futures::channel::oneshot;
use futures::stream::FuturesUnordered;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{FutureExt, StreamExt};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
Expand Down Expand Up @@ -665,25 +665,23 @@ where

info!("Finished collecting already plotted pieces successfully");

for single_disk_farm in single_disk_farms.iter() {
farmer_metrics.update_farm_size(
single_disk_farm.id(),
single_disk_farm.total_sectors_count(),
);
farmer_metrics.inc_farm_plotted(
single_disk_farm.id(),
single_disk_farm
.plotted_sectors_count()
.await
.try_into()
.unwrap(),
);
}
let total_and_plotted_sectors = single_disk_farms
.iter()
.map(|single_disk_farm| async {
let total_sector_count = single_disk_farm.total_sectors_count();
let plotted_sectors_count = single_disk_farm.plotted_sectors_count().await;

(total_sector_count, plotted_sectors_count)
})
.collect::<FuturesOrdered<_>>()
.collect::<Vec<_>>()
.await;

let mut single_disk_farms_stream = single_disk_farms
.into_iter()
.enumerate()
.map(|(disk_farm_index, single_disk_farm)| {
.zip(total_and_plotted_sectors)
.map(|((disk_farm_index, single_disk_farm), sector_counts)| {
let disk_farm_index = disk_farm_index.try_into().expect(
"More than 256 plots are not supported, this is checked above already; qed",
);
Expand All @@ -709,6 +707,17 @@ where
}
};

let (total_sector_count, plotted_sectors_count) = sector_counts;
farmer_metrics.update_sectors_total(
single_disk_farm.id(),
total_sector_count - plotted_sectors_count,
SectorState::NotPlotted,
);
farmer_metrics.update_sectors_total(
single_disk_farm.id(),
plotted_sectors_count,
SectorState::Plotted,
);
single_disk_farm
.on_sector_update(Arc::new({
let single_disk_farm_id = *single_disk_farm.id();
Expand Down Expand Up @@ -748,19 +757,24 @@ where
on_plotted_sector_callback(plotted_sector, old_plotted_sector);
farmer_metrics.observe_sector_plotting_time(&single_disk_farm_id, time);
farmer_metrics.sector_plotted.inc();
if old_plotted_sector.is_some() {
farmer_metrics.inc_farm_replotted(&single_disk_farm_id);
} else {
farmer_metrics.inc_farm_plotted(&single_disk_farm_id, 1);
}
farmer_metrics
.update_sector_state(&single_disk_farm_id, SectorState::Plotted);
}
SectorUpdate::Expiration(SectorExpirationDetails::AboutToExpire) => {
farmer_metrics.inc_farm_about_to_expire(&single_disk_farm_id, 1);
farmer_metrics.update_sector_state(
&single_disk_farm_id,
SectorState::AboutToExpire,
);
}
SectorUpdate::Expiration(SectorExpirationDetails::Expired) => {
farmer_metrics.inc_farm_expired(&single_disk_farm_id, 1);
farmer_metrics
.update_sector_state(&single_disk_farm_id, SectorState::Expired);
}
SectorUpdate::Expiration(SectorExpirationDetails::Determined {
..
}) => {
// Not interested in here
}
_ => {}
}
}))
.detach();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,32 @@ use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::{Registry, Unit};
use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::time::Duration;
use subspace_core_primitives::SectorIndex;
use subspace_farmer::single_disk_farm::farming::ProvingResult;
use subspace_farmer::single_disk_farm::{FarmingError, SingleDiskFarmId};

#[derive(Debug, Copy, Clone)]
pub(super) enum SectorState {
NotPlotted,
Plotted,
AboutToExpire,
Expired,
}

impl fmt::Display for SectorState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::NotPlotted => "NotPlotted",
Self::Plotted => "Plotted",
Self::AboutToExpire => "AboutToExpire",
Self::Expired => "Expired",
})
}
}

#[derive(Debug, Clone)]
pub(super) struct FarmerMetrics {
auditing_time: Family<Vec<(String, String)>, Histogram>,
Expand All @@ -18,10 +38,7 @@ pub(super) struct FarmerMetrics {
sector_encoding_time: Family<Vec<(String, String)>, Histogram>,
sector_writing_time: Family<Vec<(String, String)>, Histogram>,
sector_plotting_time: Family<Vec<(String, String)>, Histogram>,
farm_size: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
farm_plotted: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
farm_expired: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
farm_about_to_expire: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
sectors_total: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
pub(super) sector_downloading: Counter<u64, AtomicU64>,
pub(super) sector_downloaded: Counter<u64, AtomicU64>,
pub(super) sector_encoding: Counter<u64, AtomicU64>,
Expand Down Expand Up @@ -110,40 +127,13 @@ impl FarmerMetrics {
sector_plotting_time.clone(),
);

let farm_size = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default);

sub_registry.register_with_unit(
"farm_size",
"Farm size",
Unit::Other("sectors".to_string()),
farm_size.clone(),
);

let farm_plotted = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default);

sub_registry.register_with_unit(
"farm_plotted",
"Number of plotted farm sectors",
Unit::Other("sectors".to_string()),
farm_plotted.clone(),
);

let farm_expired = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default);

sub_registry.register_with_unit(
"farm_expired",
"Number of expired farm sectors",
Unit::Other("sectors".to_string()),
farm_expired.clone(),
);

let farm_about_to_expire = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default);
let sectors_total = Family::<_, _>::new_with_constructor(Gauge::<_, _>::default);

sub_registry.register_with_unit(
"farm_about_to_expire",
"Number of farm sectors about to expire",
"sectors_total",
"Total number of sectors with corresponding state",
Unit::Other("sectors".to_string()),
farm_about_to_expire.clone(),
sectors_total.clone(),
);

let sector_downloading = Counter::<_, _>::default();
Expand Down Expand Up @@ -226,10 +216,7 @@ impl FarmerMetrics {
sector_encoding_time,
sector_writing_time,
sector_plotting_time,
farm_size,
farm_plotted,
farm_expired,
farm_about_to_expire,
sectors_total,
sector_downloading,
sector_downloaded,
sector_encoding,
Expand Down Expand Up @@ -281,6 +268,73 @@ impl FarmerMetrics {
.inc();
}

pub(super) fn update_sectors_total(
&self,
single_disk_farm_id: &SingleDiskFarmId,
sectors: SectorIndex,
state: SectorState,
) {
self.sectors_total
.get_or_create(&vec![
("farm_id".to_string(), single_disk_farm_id.to_string()),
("state".to_string(), state.to_string()),
])
.set(i64::from(sectors));
}

pub(super) fn update_sector_state(
&self,
single_disk_farm_id: &SingleDiskFarmId,
state: SectorState,
) {
self.sectors_total
.get_or_create(&vec![
("farm_id".to_string(), single_disk_farm_id.to_string()),
("state".to_string(), state.to_string()),
])
.inc();
match state {
SectorState::NotPlotted => {
// Never called, doesn't make sense
}
SectorState::Plotted => {
let not_plotted_sectors = self.sectors_total.get_or_create(&vec![
("farm_id".to_string(), single_disk_farm_id.to_string()),
("state".to_string(), SectorState::NotPlotted.to_string()),
]);
if not_plotted_sectors.get() > 0 {
// Initial plotting
not_plotted_sectors.dec();
} else {
let expired_sectors = self.sectors_total.get_or_create(&vec![
("farm_id".to_string(), single_disk_farm_id.to_string()),
("state".to_string(), SectorState::Expired.to_string()),
]);
if expired_sectors.get() > 0 {
// Replaced expired sector
expired_sectors.dec();
} else {
// Replaced about to expire sector
self.sectors_total
.get_or_create(&vec![
("farm_id".to_string(), single_disk_farm_id.to_string()),
("state".to_string(), SectorState::AboutToExpire.to_string()),
])
.dec();
}
}
}
SectorState::AboutToExpire | SectorState::Expired => {
self.sectors_total
.get_or_create(&vec![
("farm_id".to_string(), single_disk_farm_id.to_string()),
("state".to_string(), SectorState::Plotted.to_string()),
])
.dec();
}
}
}

pub(super) fn observe_sector_downloading_time(
&self,
single_disk_farm_id: &SingleDiskFarmId,
Expand Down Expand Up @@ -332,100 +386,4 @@ impl FarmerMetrics {
)])
.observe(time.as_secs_f64());
}

pub(super) fn update_farm_size(
&self,
single_disk_farm_id: &SingleDiskFarmId,
sectors: SectorIndex,
) {
self.farm_size
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.set(i64::from(sectors));
}

pub(super) fn inc_farm_plotted(
&self,
single_disk_farm_id: &SingleDiskFarmId,
sectors: SectorIndex,
) {
self.farm_plotted
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.inc_by(i64::from(sectors));
}

pub(super) fn inc_farm_expired(
&self,
single_disk_farm_id: &SingleDiskFarmId,
sectors: SectorIndex,
) {
self.farm_expired
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.inc_by(i64::from(sectors));
self.farm_plotted
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.dec_by(i64::from(sectors));
}

pub(super) fn inc_farm_about_to_expire(
&self,
single_disk_farm_id: &SingleDiskFarmId,
sectors: SectorIndex,
) {
self.farm_about_to_expire
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.inc_by(i64::from(sectors));
self.farm_plotted
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.dec_by(i64::from(sectors));
}

pub(super) fn inc_farm_replotted(&self, single_disk_farm_id: &SingleDiskFarmId) {
self.farm_plotted
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.inc();
if self
.farm_expired
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.get()
> 0
{
self.farm_expired
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.dec();
} else {
self.farm_about_to_expire
.get_or_create(&vec![(
"farm_id".to_string(),
single_disk_farm_id.to_string(),
)])
.dec();
}
}
}
9 changes: 7 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1275,8 +1275,13 @@ impl SingleDiskFarm {
}

/// Number of sectors successfully plotted so far
pub async fn plotted_sectors_count(&self) -> usize {
self.sectors_metadata.read().await.len()
pub async fn plotted_sectors_count(&self) -> SectorIndex {
self.sectors_metadata
.read()
.await
.len()
.try_into()
.expect("Number of sectors never exceeds `SectorIndex` type; qed")
}

/// Read information about sectors plotted so far
Expand Down
Loading