Skip to content

Commit

Permalink
[NFT Metadata Crawler] Add column to mark URIs to not be parsed (#11846)
Browse files Browse the repository at this point in the history
  • Loading branch information
just-in-chang authored Feb 1, 2024
1 parent e875621 commit 8d7d6f0
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE nft_metadata_crawler.parsed_asset_uris DROP COLUMN do_not_parse;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE nft_metadata_crawler.parsed_asset_uris ADD COLUMN do_not_parse BOOLEAN NOT NULL DEFAULT FALSE;
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct NFTMetadataCrawlerURIs {
json_parser_retry_count: i32,
image_optimizer_retry_count: i32,
animation_optimizer_retry_count: i32,
do_not_parse: bool,
}

impl NFTMetadataCrawlerURIs {
Expand All @@ -33,6 +34,7 @@ impl NFTMetadataCrawlerURIs {
json_parser_retry_count: 0,
image_optimizer_retry_count: 0,
animation_optimizer_retry_count: 0,
do_not_parse: false,
}
}

Expand Down Expand Up @@ -142,4 +144,12 @@ impl NFTMetadataCrawlerURIs {
pub fn increment_animation_optimizer_retry_count(&mut self) {
self.animation_optimizer_retry_count += 1;
}

pub fn get_do_not_parse(&self) -> bool {
self.do_not_parse
}

pub fn set_do_not_parse(&mut self, do_not_parse: bool) {
self.do_not_parse = do_not_parse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct NFTMetadataCrawlerURIsQuery {
pub image_optimizer_retry_count: i32,
pub animation_optimizer_retry_count: i32,
pub inserted_at: chrono::NaiveDateTime,
pub do_not_parse: bool,
}

impl NFTMetadataCrawlerURIsQuery {
Expand Down Expand Up @@ -102,3 +103,21 @@ impl NFTMetadataCrawlerURIsQuery {
})
}
}

impl Default for NFTMetadataCrawlerURIsQuery {
fn default() -> Self {
Self {
asset_uri: "".to_string(),
raw_image_uri: None,
raw_animation_uri: None,
cdn_json_uri: None,
cdn_image_uri: None,
cdn_animation_uri: None,
json_parser_retry_count: 0,
image_optimizer_retry_count: 0,
animation_optimizer_retry_count: 0,
inserted_at: chrono::NaiveDateTime::default(),
do_not_parse: false,
}
}
}
1 change: 1 addition & 0 deletions ecosystem/nft-metadata-crawler-parser/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod nft_metadata_crawler {
image_optimizer_retry_count -> Int4,
animation_optimizer_retry_count -> Int4,
inserted_at -> Timestamp,
do_not_parse -> Bool,
}
}

Expand Down
9 changes: 9 additions & 0 deletions ecosystem/nft-metadata-crawler-parser/src/utils/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ pub static PARSER_FAIL_COUNT: Lazy<IntCounter> = Lazy::new(|| {
.unwrap()
});

/// Number of times the NFT Metadata Crawler Parser has received a URI marked as not to parse
pub static DO_NOT_PARSE_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"nft_metadata_crawler_parser_do_not_parse_count",
"Number of times the parser received a URI marked as not to parse",
)
.unwrap()
});

// PUBSUB METRICS

/// Number of times a PubSub message has successfully been ACK'd
Expand Down
5 changes: 3 additions & 2 deletions ecosystem/nft-metadata-crawler-parser/src/utils/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ pub fn run_migrations(pool: &Pool<ConnectionManager<PgConnection>>) {
/// Upserts URIs into database
pub fn upsert_uris(
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
entry: NFTMetadataCrawlerURIs,
entry: &NFTMetadataCrawlerURIs,
) -> anyhow::Result<usize> {
use schema::nft_metadata_crawler::parsed_asset_uris::dsl::*;

let query = diesel::insert_into(schema::nft_metadata_crawler::parsed_asset_uris::table)
.values(&entry)
.values(entry)
.on_conflict(asset_uri)
.do_update()
.set((
Expand All @@ -51,6 +51,7 @@ pub fn upsert_uris(
image_optimizer_retry_count.eq(excluded(image_optimizer_retry_count)),
json_parser_retry_count.eq(excluded(json_parser_retry_count)),
animation_optimizer_retry_count.eq(excluded(animation_optimizer_retry_count)),
do_not_parse.eq(excluded(do_not_parse)),
));

let debug_query = diesel::debug_query::<diesel::pg::Pg, _>(&query).to_string();
Expand Down
2 changes: 1 addition & 1 deletion ecosystem/nft-metadata-crawler-parser/src/utils/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::time::Duration;
pub async fn write_json_to_gcs(
bucket: String,
id: String,
json: Value,
json: &Value,
client: &Client,
) -> anyhow::Result<String> {
GCS_UPLOAD_INVOCATION_COUNT.inc();
Expand Down
36 changes: 27 additions & 9 deletions ecosystem/nft-metadata-crawler-parser/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,19 @@ impl Worker {
pub async fn parse(&mut self) -> anyhow::Result<()> {
// Deduplicate asset_uri
// Exit if not force or if asset_uri has already been parsed
if !self.force
&& NFTMetadataCrawlerURIsQuery::get_by_asset_uri(self.asset_uri.clone(), &mut self.conn)
.is_some()
{
let prev_model =
NFTMetadataCrawlerURIsQuery::get_by_asset_uri(self.asset_uri.clone(), &mut self.conn);
if !self.force && prev_model.is_some() {
self.log_info("Duplicate asset_uri found, skipping parse");
DUPLICATE_ASSET_URI_COUNT.inc();
return Ok(());
}

if prev_model.unwrap_or_default().do_not_parse {
self.log_info("do_not_parse is true, skipping parse");
return Ok(());
}

// Skip if asset_uri contains any of the uris in URI_SKIP_LIST
if let Some(blacklist) = self.config.uri_blacklist.clone() {
if blacklist.iter().any(|uri| self.asset_uri.contains(uri)) {
Expand All @@ -326,8 +330,12 @@ impl Worker {

// Skip if asset_uri is not a valid URI
if Url::parse(&self.asset_uri).is_err() {
self.log_info("URI is invalid, skipping parse");
self.log_info("URI is invalid, skipping parse, marking as do_not_parse");
self.model.set_do_not_parse(true);
SKIP_URI_COUNT.with_label_values(&["invalid"]).inc();
if let Err(e) = upsert_uris(&mut self.conn, &self.model) {
self.log_error("Commit to Postgres failed", &e);
}
return Ok(());
}

Expand Down Expand Up @@ -369,7 +377,7 @@ impl Worker {
let cdn_json_uri_result = write_json_to_gcs(
self.config.bucket.clone(),
self.asset_data_id.clone(),
json,
&json,
&self.gcs_client,
)
.await;
Expand All @@ -389,7 +397,7 @@ impl Worker {

// Commit model to Postgres
self.log_info("Committing JSON to Postgres");
if let Err(e) = upsert_uris(&mut self.conn, self.model.clone()) {
if let Err(e) = upsert_uris(&mut self.conn, &self.model) {
self.log_error("Commit to Postgres failed", &e);
}

Expand Down Expand Up @@ -453,6 +461,16 @@ impl Worker {
(vec![], ImageFormat::Png)
});

if image.is_empty() && json == Value::Null {
self.log_info("Image and JSON are empty, skipping parse, marking as do_not_parse");
self.model.set_do_not_parse(true);
SKIP_URI_COUNT.with_label_values(&["empty"]).inc();
if let Err(e) = upsert_uris(&mut self.conn, &self.model) {
self.log_error("Commit to Postgres failed", &e);
}
return Ok(());
}

// Save resized and optimized image to GCS
if !image.is_empty() {
self.log_info("Writing image to GCS");
Expand Down Expand Up @@ -481,7 +499,7 @@ impl Worker {

// Commit model to Postgres
self.log_info("Committing image to Postgres");
if let Err(e) = upsert_uris(&mut self.conn, self.model.clone()) {
if let Err(e) = upsert_uris(&mut self.conn, &self.model) {
self.log_error("Commit to Postgres failed", &e);
}

Expand Down Expand Up @@ -570,7 +588,7 @@ impl Worker {

// Commit model to Postgres
self.log_info("Committing animation to Postgres");
if let Err(e) = upsert_uris(&mut self.conn, self.model.clone()) {
if let Err(e) = upsert_uris(&mut self.conn, &self.model) {
self.log_error("Commit to Postgres failed", &e);
}

Expand Down

0 comments on commit 8d7d6f0

Please sign in to comment.