Skip to content

Commit

Permalink
fix: search platforms separately to keep under 10,000 results (#48)
Browse files Browse the repository at this point in the history
* Search for platforms separately to stay <10,000 link results

* Update root Makefile for ruff switch

* Update unit tests

* Mock scihub search supports platform query

* Fix test of granule count (5x row for 5 days, per platform)

* Revert "Mock scihub search supports platform query"

This reverts commit 4841e3c.

* Revert "Revert "Mock scihub search supports platform query""

This reverts commit 50a5116.

* Remove non-platform specific responses

* Update expected number of granule for 4x100 results

* fix query->filter
  • Loading branch information
ceholden authored Dec 10, 2024
1 parent 5d81de9 commit 7a62b00
Show file tree
Hide file tree
Showing 25 changed files with 34,599 additions and 27,154 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ lint:
$(MAKE) -C alembic_migration lint

format:
pipenv run isort --profile black cdk/ integration_tests/
pipenv run black cdk/ integration_tests/
pipenv run ruff format cdk/ integration_tests/
$(MAKE) -C lambdas/link_fetcher format
$(MAKE) -C lambdas/date_generator format
$(MAKE) -C lambdas/downloader format
Expand Down
5 changes: 3 additions & 2 deletions alembic_migration/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import boto3
from alembic import context
from db.models.base import Base
from sqlalchemy import engine_from_config, pool
from sqlalchemy.engine import url

Expand All @@ -19,7 +20,7 @@
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
target_metadata = Base.metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
Expand All @@ -30,7 +31,7 @@
def get_url() -> url.URL:
"""
Returns a SQLAlchemy `engine.url.URL`
based on a AWS SecretsManager Secret, whos ARN is available as a environment
based on a AWS SecretsManager Secret, whose ARN is available as a environment
variable named DB_CONNECTION_SECRET_ARN
:returns: URL representing a sqlalchemy url for the database
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""Granule count by platform
Revision ID: ec89745f0bac
Revises: 1c8c38951d47
Create Date: 2024-11-25 18:13:36.964410
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "ec89745f0bac"
down_revision = "1c8c38951d47"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"granule_count_pkey",
"granule_count",
"primary",
)
op.add_column(
"granule_count",
sa.Column(
"platform",
sa.String(),
nullable=False,
server_default="S2A+S2B",
),
)
op.create_primary_key(
"granule_count_pkey",
"granule_count",
["date", "platform"],
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"granule_count_pkey",
"granule_count",
"primary",
)
op.drop_column("granule_count", "platform")
op.create_primary_key(
"granule_count_pkey",
"granule_count",
["date"],
)
# ### end Alembic commands ###
4 changes: 2 additions & 2 deletions cdk/downloader_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ def __init__(
link_fetcher_map_task = sfn.Map(
self,
id=f"{identifier}-link-fetcher-map",
input_path="$.Payload.query_dates",
parameters={"query_date.$": "$$.Map.Item.Value"},
input_path="$.Payload.query_dates_platforms",
parameters={"query_date_platform.$": "$$.Map.Item.Value"},
max_concurrency=3,
).iterator(
link_fetcher_task.next(
Expand Down
30 changes: 21 additions & 9 deletions integration_tests/test_link_fetching.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,22 @@ def test_that_link_fetching_invocation_executes_correctly(
)

granules = db_session.query(Granule).all()
assert_that(granules).is_length(78)

granule_counts = db_session.query(GranuleCount).all()
assert_that(granule_counts).is_length(5)
assert_that(granules).is_length(222)

# 5 days of granule count per platform
for platform in ("S2A", "S2B"):
granule_counts = (
db_session.query(GranuleCount)
.filter(GranuleCount.platform == platform)
.all()
)
assert_that(granule_counts).is_length(5)

statuses = db_session.query(Status).all()
assert_that(statuses).is_length(1)

polling2.poll(
check_sqs_message_count, args=(sqs_client, queue_url, 78), step=5, timeout=120
check_sqs_message_count, args=(sqs_client, queue_url, 222), step=5, timeout=120
)


Expand Down Expand Up @@ -93,7 +99,7 @@ def test_that_link_fetching_invocation_executes_correctly_when_a_duplicate_granu
)

granules = db_session.query(Granule).all()
assert_that(granules).is_length(78)
assert_that(granules).is_length(222)

# Assert that the original granule we added is still there
granule_we_inserted = (
Expand All @@ -104,12 +110,18 @@ def test_that_link_fetching_invocation_executes_correctly_when_a_duplicate_granu
assert_that(granule_we_inserted.tileid).is_equal_to("TS101")
assert_that(granule_we_inserted.download_url).is_equal_to("A download url")

granule_counts = db_session.query(GranuleCount).all()
assert_that(granule_counts).is_length(5)
# 5 days of granule count per platform
for platform in ("S2A", "S2B"):
granule_counts = (
db_session.query(GranuleCount)
.filter(GranuleCount.platform == platform)
.all()
)
assert_that(granule_counts).is_length(5)

statuses = db_session.query(Status).all()
assert_that(statuses).is_length(1)

polling2.poll(
check_sqs_message_count, args=(sqs_client, queue_url, 77), step=5, timeout=120
check_sqs_message_count, args=(sqs_client, queue_url, 221), step=5, timeout=120
)
8 changes: 6 additions & 2 deletions lambdas/date_generator/handler.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
from datetime import datetime, timedelta
from itertools import product
from typing import List


def handler(event, context):
return {"query_dates": get_dates()}
platforms = ["S2A", "S2B"]
return {
"query_dates_platforms": list(product(get_dates(), platforms)),
}


def get_dates() -> List[str]:
"""
Returns 5 date strings from `datetime.now() - 1 day` with the latest day first
Strings are formatted as %Y-%m-%d
:returns: List[str] representing 21 days from yesterday
:returns: List[str] representing 5 days from yesterday
"""
yesterdays_date = datetime.now().date() - timedelta(days=1)
return [
Expand Down
6 changes: 5 additions & 1 deletion lambdas/date_generator/tests/test_date_generator_handler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from itertools import product

from assertpy import assert_that
from freezegun import freeze_time
Expand All @@ -20,6 +21,9 @@ def test_that_date_generator_handler_returns_correct_dates():
expected_dates = [
datetime(2020, 4, i).date().strftime("%Y-%m-%d") for i in range(21, 16, -1)
]
expected_handler_output = {"query_dates": expected_dates}
expected_platforms = ["S2A", "S2B"]
expected_handler_output = {
"query_dates_platforms": list(product(expected_dates, expected_platforms))
}
actual_handler_output = handler(None, None)
assert_that(expected_handler_output).is_equal_to(actual_handler_output)
64 changes: 49 additions & 15 deletions lambdas/link_fetcher/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
Any,
Callable,
Final,
Literal,
Mapping,
Protocol,
Sequence,
Expand Down Expand Up @@ -39,6 +40,7 @@
"SEARCH_URL",
"https://catalogue.dataspace.copernicus.eu",
)
Platform = Literal["S2A", "S2B"]


@dataclass(frozen=True)
Expand Down Expand Up @@ -72,14 +74,16 @@ def _handler(
session_maker: SessionMaker,
) -> HandlerResult:
accepted_tile_ids = get_accepted_tile_ids()
query_date = event["query_date"]
query_date, query_platform = event["query_date_platform"]
day = datetime.strptime(query_date, "%Y-%m-%d").date()

fetched_links = get_fetched_links(session_maker, day)
params = get_query_parameters(fetched_links, day)
fetched_links = get_fetched_links(session_maker, day, query_platform)
params = get_query_parameters(fetched_links, day, query_platform)
search_results, total_results = get_page_for_query_and_total_results(params)
print(f"Previously fetched links for {query_date}: {fetched_links}/{total_results}")
update_total_results(session_maker, day, total_results)
print(
f"Previously fetched links for {query_date}/{query_platform}: {fetched_links}/{total_results}"
)
update_total_results(session_maker, day, query_platform, total_results)
bail_early = False

while search_results:
Expand All @@ -89,18 +93,25 @@ def _handler(
)
add_search_results_to_db_and_sqs(session_maker, filtered_search_results)
update_last_fetched_link_time(session_maker)
update_fetched_links(session_maker, day, number_of_fetched_links)
update_fetched_links(
session_maker, day, query_platform, number_of_fetched_links
)

params = {**params, "index": params["index"] + number_of_fetched_links}
print(f"Fetched links for {query_date}: {params['index'] - 1}/{total_results}")
print(
f"Fetched links for {query_date}/{query_platform}: {params['index'] - 1}/{total_results}"
)

if bail_early := context.get_remaining_time_in_millis() < MIN_REMAINING_MILLIS:
print("Bailing early to avoid Lambda timeout")
break

search_results, _ = get_page_for_query_and_total_results(params)

return {"query_date": query_date, "completed": not bail_early}
return {
"query_date_platform": [query_date, query_platform],
"completed": not bail_early,
}


def add_search_results_to_db_and_sqs(
Expand Down Expand Up @@ -163,7 +174,9 @@ def add_search_result_to_sqs(
)


def get_fetched_links(session_maker: SessionMaker, day: date) -> int:
def get_fetched_links(
session_maker: SessionMaker, day: date, platform: Platform
) -> int:
"""
For a given day, return the total
`fetched_links`, where `fetched_links` is the total number of granules that have
Expand All @@ -173,16 +186,20 @@ def get_fetched_links(session_maker: SessionMaker, day: date) -> int:
:param session_maker: sessionmaker representing the SQLAlchemy sessionmaker to use
for database interactions
:param day: date representing the day to return results for
:param platform: Sentinel-2 platform to search for (S2A, S2B, etc)
:returns: int representing `fetched_links`
"""
with session_maker() as session:
granule_count = session.query(GranuleCount).filter_by(date=day).first()
granule_count = (
session.query(GranuleCount).filter_by(date=day, platform=platform).first()
)

if granule_count:
return granule_count.fetched_links

granule_count = GranuleCount(
date=day,
platform=platform,
available_links=0,
fetched_links=0,
last_fetched_time=datetime.now(),
Expand All @@ -193,17 +210,24 @@ def get_fetched_links(session_maker: SessionMaker, day: date) -> int:
return 0


def update_total_results(session_maker: SessionMaker, day: date, total_results: int):
def update_total_results(
session_maker: SessionMaker, day: date, platform: Platform, total_results: int
):
"""
For a given day and number of results, update the `available_links` value
:param session_maker: sessionmaker representing the SQLAlchemy sessionmaker to use
for database interactions
:param day: date representing the day to update `available_links` for
:param platform: Sensor platform (S2A, S2B, etc)
:param total_results: int representing the total results available for the day,
this value will be applied to `available_links`
"""
with session_maker() as session:
if granule_count := session.query(GranuleCount).filter_by(date=day).first():
if (
granule_count := session.query(GranuleCount)
.filter_by(date=day, platform=platform)
.first()
):
granule_count.available_links = total_results
session.commit()

Expand All @@ -230,18 +254,25 @@ def update_last_fetched_link_time(session_maker: SessionMaker):
session.commit()


def update_fetched_links(session_maker: SessionMaker, day: date, fetched_links: int):
def update_fetched_links(
session_maker: SessionMaker, day: date, platform: Platform, fetched_links: int
):
"""
For a given day, update the `fetched_links` value in `granule_count` to the provided
`fetched_links` value and update the `last_fetched_time` value to `datetime.now()`
:param session_maker: sessionmaker representing the SQLAlchemy sessionmaker to use
for database interactions
:param day: date representing the day to update in `granule_count`
:param platform: Sentinel-2 platform to search for (S2A, S2B, etc)
:param fetched_links: int representing the total number of links fetched in this run
it is not the total number of Granules created
"""
with session_maker() as session:
if granule_count := session.query(GranuleCount).filter_by(date=day).first():
if (
granule_count := session.query(GranuleCount)
.filter_by(date=day, platform=platform)
.first()
):
granule_count.fetched_links += fetched_links
granule_count.last_fetched_time = datetime.now()
session.commit()
Expand Down Expand Up @@ -281,7 +312,9 @@ def filter_search_results(
)


def get_query_parameters(start: int, day: date) -> Mapping[str, Any]:
def get_query_parameters(
start: int, day: date, platform: Platform
) -> Mapping[str, Any]:
"""
Returns the query parameters that are needed for getting new imagery from
search (Copernicus Data Space Ecosystem)
Expand All @@ -298,6 +331,7 @@ def get_query_parameters(start: int, day: date) -> Mapping[str, Any]:
"publishedAfter": f"{date_string}T00:00:00Z",
"publishedBefore": f"{date_string}T23:59:59Z",
"startDate": f"{oldest_acquisition_date.strftime('%Y-%m-%d')}T00:00:00Z",
"platform": platform,
"sortParam": "published",
"sortOrder": "desc",
"maxRecords": 2000,
Expand Down
1 change: 1 addition & 0 deletions lambdas/link_fetcher/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def generate_mock_responses_for_one_day(mock_search_response):
"&publishedAfter={0}T00:00:00Z"
"&publishedBefore={0}T23:59:59Z"
"&startDate=2019-12-02T00:00:00Z"
"&platform=S2A"
"&sortParam=published"
"&sortOrder=desc"
"&maxRecords=2000"
Expand Down
Loading

0 comments on commit 7a62b00

Please sign in to comment.